From dce32cb0d782baf84b237fcf1d884e4959879ab0 Mon Sep 17 00:00:00 2001 From: perezgonzalez-irene <iregon@noc.ac.uk> Date: Tue, 21 Jan 2020 09:59:32 +0000 Subject: [PATCH] Second commit, full validation added back --- read.py | 5 ++--- reader/get_sections.py | 18 +++++++++++------- reader/read_sections.py | 2 +- reader/reader.py | 17 +++++++++++------ schemas/schemas.py | 2 ++ 5 files changed, 27 insertions(+), 17 deletions(-) diff --git a/read.py b/read.py index b3a9d6f..0af38af 100644 --- a/read.py +++ b/read.py @@ -51,8 +51,8 @@ def read(source, data_model = None, data_model_path = None, sections = None,chun logging.error('Data source is empty (first argument to read()) ') return elif not os.path.isfile(source): - logging.error('Can\'t reach data source {} as a file'.format(source)) - logging.info('Supported in-memory data sources are {}'.format(",".join(properties.supported_sources))) + logging.error('Could not open data source file {}'.format(source)) + logging.info('Otherwise, supported in-memory data sources are {}'.format(",".join([ str(x) for x in properties.supported_sources]))) return if not validate_arg('sections',sections,list): return @@ -71,7 +71,6 @@ def read(source, data_model = None, data_model_path = None, sections = None,chun imodel = data_model if data_model else data_model_path logging.info("EXTRACTING DATA FROM MODEL: {}".format(imodel)) data, valid = reader.read_model(source,schema, sections = sections, chunksize = chunksize, skiprows = skiprows) - # 4. Create out data attributes logging.info("CREATING OUTPUT DATA ATTRIBUTES FROM DATA MODEL(S)") data_columns = [ x for x in data ] if isinstance(data,pd.DataFrame) else data.orig_options['names'] diff --git a/reader/get_sections.py b/reader/get_sections.py index c4073d3..ef43f7f 100644 --- a/reader/get_sections.py +++ b/reader/get_sections.py @@ -184,8 +184,8 @@ def extract_sections(string_df): section_dict = dict() section_groups = [ d[x] for d in parsing_order for x in d.keys() ] sections = [item for sublist in section_groups for item in sublist] + for section in sections: - #section_dict[section] = pd.DataFrame() # Index as initial size to help final merging section_dict[section] = pd.Series() thread_ids = [ x for x in threads.keys() if threads[x]['section'] == section ] for thread_id in thread_ids: @@ -200,20 +200,24 @@ def get_sections(string_df, schema, read_sections): global sentinals, section_lens, sentinals_lens global parsing_order # Proceed to split sections if more than one + # else return section in a named column if len(schema['sections'].keys())> 1: section_lens = { section: schema['sections'][section]['header'].get('length') for section in schema['sections'].keys()} sentinals = { section: schema['sections'][section]['header'].get('sentinal') for section in schema['sections'].keys()} sentinals_lens = { section: len(sentinals.get(section)) if sentinals.get(section) else 0 for section in sentinals.keys()} parsing_order = schema['header']['parsing_order'] - # Get sections separated - section_strings = extract_sections(string_df) - # Paste in order in a single dataframe with columns named as sections - # Do not include sections not asked for + # Get sections separated: section dict has a key:value pair for each + # section in the data model. If the section does not exist in the data, + # the value is an empty pd.Series + section_dict = extract_sections(string_df) + # Paste in order (as read_sections) in a single dataframe with columns + # named as sections: + # - Drop unwanted sections + # - Keep requested but non-existent sections df_out = pd.DataFrame() for section in read_sections: - df_out = pd.concat([df_out,section_strings[section].rename(section)],sort = False,axis=1) + df_out = pd.concat([df_out,section_dict[section].rename(section)],sort = False,axis=1) else: - # return section in named column df_out = string_df df_out.columns = read_sections diff --git a/reader/read_sections.py b/reader/read_sections.py index e95631a..5205c7b 100644 --- a/reader/read_sections.py +++ b/reader/read_sections.py @@ -62,7 +62,7 @@ def read_data(section_df,section_schema): section_df[element] = converters.get(section_dtypes.get(element))(section_df[element], **kwargs) section_valid[element] = missing | section_df[element].notna() - + return section_df,section_valid def read_sections(sections_df, schema): diff --git a/reader/reader.py b/reader/reader.py index 737367b..796fa2c 100644 --- a/reader/reader.py +++ b/reader/reader.py @@ -59,11 +59,16 @@ def read_model(source,schema, sections = None, chunksize = None, skiprows = None valid_buffer = StringIO() for i,string_df in enumerate(TextParser): - # a. Get sections separated in a dataframe columns: - # one per column, only requested sections, ignore rest. + # a. Get a DF with sections separated in columns: + # - one section per column + # - only sections requested, ignore rest + # - requested NA sections as NaN columns + # - columns order as in read_sections_list sections_df = get_sections.get_sections(string_df, schema, read_sections_list) + # b. Read elements from sections: along data chunks, resulting data types # may vary if gaps, keep track of data types! + # Sections as parsed in the same order as sections_df.columns [data_df, valid_df, out_dtypesi ] = read_sections.read_sections(sections_df, schema) if i == 0: out_dtypes = copy.deepcopy(out_dtypesi) @@ -73,16 +78,16 @@ def read_model(source,schema, sections = None, chunksize = None, skiprows = None out_dtypes.update({ k:out_dtypesi.get(k) }) # Save to buffer data_df.to_csv(data_buffer,header = False, mode = 'a', encoding = 'utf-8',index = False) - valid_df.to_csv(data_buffer,header = False, mode = 'a', encoding = 'utf-8',index = False) + valid_df.to_csv(valid_buffer,header = False, mode = 'a', encoding = 'utf-8',index = False) # 4. OUTPUT DATA ---------------------------------------------------------- # WE'LL NEED TO POSPROCESS THIS WHEN READING MULTIPLE REPORTS PER LINE data_buffer.seek(0) valid_buffer.seek(0) logging.info("Wrapping output....") -# Chunksize from the imported TextParser if it is a pd.io.parsers.TextFileReader -# (source is either pd.io.parsers.TextFileReader or a file with chunksize specified on input): -# This way it supports direct chunksize property inheritance if the input source was a pd.io.parsers.TextFileReader + # Chunksize from the imported TextParser if it is a pd.io.parsers.TextFileReader + # (source is either pd.io.parsers.TextFileReader or a file with chunksize specified on input): + # This way it supports direct chunksize property inheritance if the input source was a pd.io.parsers.TextFileReader chunksize = TextParser.orig_options['chunksize'] if isinstance(TextParser,pd.io.parsers.TextFileReader) else None # 'datetime' is not a valid pandas dtype: Only on output (on reading) will be then converted (via parse_dates) to datetime64[ns] type, cannot specify 'datetime' (of any kind) here: will fail date_columns = [] # Needs to be the numeric index of the column, as seems not to be able to work with tupples.... diff --git a/schemas/schemas.py b/schemas/schemas.py index f5dcf89..a5778ad 100644 --- a/schemas/schemas.py +++ b/schemas/schemas.py @@ -149,6 +149,8 @@ def df_schema(df_columns, schema, data_model, supp_section = None, supp_schema = for section in schema.get('sections'): if section == properties.dummy_level: flat_schema.update(schema['sections'].get(section).get('elements')) + elif schema['sections'].get(section).get('header').get('disable_read'): + flat_schema.update( { (section, section): {'column_type':'object'} }) else: flat_schema.update( { (section, x): schema['sections'].get(section).get('elements').get(x) for x in schema['sections'].get(section).get('elements') }) -- GitLab