|
| 1 | +import argparse |
| 2 | +import glob |
| 3 | +import json |
| 4 | +import logging |
| 5 | +import os |
| 6 | +import sys |
| 7 | +import traceback |
| 8 | +from datetime import datetime, timezone |
| 9 | + |
| 10 | +import numpy as np |
| 11 | +import pandas as pd |
| 12 | +from d3m.metadata.pipeline import Pipeline |
| 13 | +from d3m.metadata.problem import Problem |
| 14 | +from d3m.utils import yaml_load_all |
| 15 | + |
| 16 | +LOGGER = logging.getLogger(__name__) |
| 17 | +TUNING_PARAMETER = 'https://metadata.datadrivendiscovery.org/types/TuningParameter' |
| 18 | + |
| 19 | + |
| 20 | +def load_pipeline(pipeline): |
| 21 | + with open(pipeline) as _pipeline: |
| 22 | + if pipeline.endswith('.json'): |
| 23 | + pipeline = Pipeline.from_json(_pipeline) |
| 24 | + else: |
| 25 | + pipeline = Pipeline.from_yaml(_pipeline) |
| 26 | + |
| 27 | + return pipeline |
| 28 | + |
| 29 | + |
| 30 | +def get_default_step_hyperparams(step): |
| 31 | + default_tunable_hyperparams = {} |
| 32 | + for name, hp in step.get_all_hyperparams().items(): |
| 33 | + if TUNING_PARAMETER not in hp.semantic_types: |
| 34 | + continue |
| 35 | + |
| 36 | + default_tunable_hyperparams[name] = hp.get_default() |
| 37 | + |
| 38 | + return default_tunable_hyperparams |
| 39 | + |
| 40 | + |
| 41 | +def clean_hyperparams(pipeline): |
| 42 | + for step in pipeline.steps: |
| 43 | + default_tunable_hyperparams = get_default_step_hyperparams(step) |
| 44 | + |
| 45 | + for name, value in step.hyperparams.items(): |
| 46 | + if name in default_tunable_hyperparams.keys(): |
| 47 | + value['data'] = default_tunable_hyperparams[name] |
| 48 | + |
| 49 | + return pipeline |
| 50 | + |
| 51 | + |
| 52 | +def pipeline_to_template(pipeline_path): |
| 53 | + pipeline = load_pipeline(pipeline_path) |
| 54 | + template = clean_hyperparams(pipeline) |
| 55 | + |
| 56 | + template.id = '' |
| 57 | + template.schema = 'https://metadata.datadrivendiscovery.org/schemas/v0/pipeline.json' |
| 58 | + template.created = datetime(2016, 11, 11, 12, 30, tzinfo=timezone.utc) |
| 59 | + |
| 60 | + return template |
| 61 | + |
| 62 | + |
| 63 | +def write_template(templates_path, template): |
| 64 | + template_id = template.get_digest()[:12] |
| 65 | + template_path = os.path.join(templates_path, template_id + '.json') |
| 66 | + |
| 67 | + with open(template_path, 'w') as template_file: |
| 68 | + print("Creating template {}".format(template_path)) |
| 69 | + template.to_json(template_file) |
| 70 | + |
| 71 | + |
| 72 | +def generate_templates(pipelines_path, templates_path): |
| 73 | + for pipeline in os.listdir(pipelines_path): |
| 74 | + pipeline_path = os.path.join(pipelines_path, pipeline) |
| 75 | + try: |
| 76 | + template = pipeline_to_template(pipeline_path) |
| 77 | + write_template(templates_path, template) |
| 78 | + except Exception as ex: |
| 79 | + print(ex) |
| 80 | + |
| 81 | + |
| 82 | +def read_pipeline_run(pipeline_run_path): |
| 83 | + data = open(pipeline_run_path) |
| 84 | + docs = yaml_load_all(stream=data) |
| 85 | + res = [] |
| 86 | + for doc in docs: |
| 87 | + res.append(doc) |
| 88 | + |
| 89 | + data.close() |
| 90 | + |
| 91 | + return res |
| 92 | + |
| 93 | + |
| 94 | +def load_problem(root_path, phase): |
| 95 | + path = os.path.join(root_path, phase, 'problem_' + phase, 'problemDoc.json') |
| 96 | + return Problem.load(problem_uri=path) |
| 97 | + |
| 98 | + |
| 99 | +def detect_data_modality(dataset_doc): |
| 100 | + with open(dataset_doc) as f: |
| 101 | + dataset_doc = json.load(f) |
| 102 | + |
| 103 | + resources = list() |
| 104 | + for resource in dataset_doc['dataResources']: |
| 105 | + resources.append(resource['resType']) |
| 106 | + |
| 107 | + if len(resources) == 1: |
| 108 | + return 'single_table' |
| 109 | + else: |
| 110 | + for resource in resources: |
| 111 | + if resource == 'edgeList': |
| 112 | + return 'graph' |
| 113 | + elif resource not in ('table', 'raw'): |
| 114 | + return resource |
| 115 | + |
| 116 | + return 'multi_table' |
| 117 | + |
| 118 | + |
| 119 | +def get_dataset_info(dataset_name, datasets_path): |
| 120 | + |
| 121 | + dataset_root = os.path.join(datasets_path, dataset_name) |
| 122 | + |
| 123 | + if not os.path.exists(dataset_root): |
| 124 | + dataset_root += '_MIN_METADATA' |
| 125 | + |
| 126 | + dataset_doc = os.path.join(dataset_root, 'TRAIN', 'dataset_TRAIN', 'datasetDoc.json') |
| 127 | + dataset_root = 'file://' + os.path.abspath(dataset_root) |
| 128 | + problem = load_problem(dataset_root, 'TRAIN') |
| 129 | + |
| 130 | + # Dataset Meta |
| 131 | + data_modality = detect_data_modality(dataset_doc) |
| 132 | + task_type = problem['problem']['task_keywords'][0].name.lower() |
| 133 | + task_subtype = problem['problem']['task_keywords'][1].name.lower() |
| 134 | + |
| 135 | + return data_modality, task_type, task_subtype |
| 136 | + |
| 137 | + |
| 138 | +def get_template_id(pipeline_id, pipelines_path, templates_path): |
| 139 | + |
| 140 | + pipeline_path = os.path.join(pipelines_path, '{}.json'.format(pipeline_id)) |
| 141 | + if not os.path.isfile(pipeline_path): |
| 142 | + raise ValueError('Can not find: {}'.format(pipeline_path)) |
| 143 | + |
| 144 | + template = pipeline_to_template(pipeline_path) |
| 145 | + write_template(templates_path, template) |
| 146 | + return template.get_digest()[:12] |
| 147 | + |
| 148 | + |
| 149 | +def produce_phase(pipeline_run): |
| 150 | + """Produce result with Produce phase data.""" |
| 151 | + scores = pipeline_run['run']['results']['scores'] |
| 152 | + |
| 153 | + if len(scores) > 1: |
| 154 | + raise ValueError('This run has more than one score!') |
| 155 | + |
| 156 | + scores = scores[0] |
| 157 | + |
| 158 | + return { |
| 159 | + 'metric': scores['metric']['metric'], |
| 160 | + 'context': pipeline_run['context'], |
| 161 | + 'normalized_score': scores['normalized'] |
| 162 | + } |
| 163 | + |
| 164 | + |
| 165 | +def extract_pipeline_run(pipeline_run, pipelines_path, templates_path, datasets_path): |
| 166 | + dataset_id = pipeline_run['datasets'][0]['id'] |
| 167 | + phase = pipeline_run['run']['phase'] |
| 168 | + succeed = pipeline_run.get('status').get('state') |
| 169 | + pipeline_id = pipeline_run['pipeline']['id'] |
| 170 | + |
| 171 | + if dataset_id.endswith('TRAIN'): |
| 172 | + dataset_name = dataset_id.replace('_dataset_TRAIN', '') |
| 173 | + else: |
| 174 | + dataset_name = dataset_id.replace('_dataset_SCORE', '') |
| 175 | + |
| 176 | + # TODO: Lazy Loader |
| 177 | + data_modality, task_type, task_subtype = get_dataset_info(dataset_name, datasets_path) |
| 178 | + |
| 179 | + template_id = get_template_id(pipeline_id, pipelines_path, templates_path) |
| 180 | + |
| 181 | + result = { |
| 182 | + 'dataset': dataset_name, |
| 183 | + 'pipeline_id': pipeline_id, |
| 184 | + 'template_id': template_id, |
| 185 | + 'modality': data_modality, |
| 186 | + 'type': task_type, |
| 187 | + 'subtype': task_subtype, |
| 188 | + 'phase': phase, |
| 189 | + 'succeed': succeed, |
| 190 | + } |
| 191 | + |
| 192 | + if phase == 'PRODUCE' and succeed != 'FAILURE': |
| 193 | + try: |
| 194 | + score = produce_phase(pipeline_run) |
| 195 | + result.update(score) |
| 196 | + except: |
| 197 | + # Timeout |
| 198 | + result['phase'] = 'TIMEOUT' |
| 199 | + |
| 200 | + return result, succeed |
| 201 | + |
| 202 | + |
| 203 | +def extract_meta_information(pipeline_runs, pipelines_path, templates_path, datasets_path): |
| 204 | + pipeline_runs_path = os.path.join(pipeline_runs, '*') |
| 205 | + |
| 206 | + results = [] |
| 207 | + errored = [] |
| 208 | + discarded = [] |
| 209 | + |
| 210 | + for pipeline_run_path in glob.glob(pipeline_runs_path): |
| 211 | + pipeline_runs = load_pipeline_run(pipeline_run_path) |
| 212 | + |
| 213 | + data_extracted = [] |
| 214 | + |
| 215 | + failed = False |
| 216 | + |
| 217 | + for pipeline_run in pipeline_runs: |
| 218 | + try: |
| 219 | + run_data, run_status = extract_pipeline_run( |
| 220 | + pipeline_run, pipelines_path, templates_path, datasets_path) |
| 221 | + |
| 222 | + failed = run_status == 'FAILURE' |
| 223 | + |
| 224 | + data_extracted.append(run_data) |
| 225 | + |
| 226 | + except Exception as e: |
| 227 | + LOGGER.warning('Failed %s with: %s', pipeline_run_path, e) |
| 228 | + continue |
| 229 | + |
| 230 | + if not failed: |
| 231 | + results.extend(data_extracted) |
| 232 | + |
| 233 | + else: |
| 234 | + LOGGER.warning('Pipeline run %s discarded.', pipeline_run_path) |
| 235 | + discarded.append(data_extracted) |
| 236 | + |
| 237 | + return results, discarded |
| 238 | + |
| 239 | + |
| 240 | +def apply_mean_score(df): |
| 241 | + mean_score = df.groupby(['pipeline_id', 'context'])['normalized_score'].mean() |
| 242 | + mean_score = mean_score.reset_index() |
| 243 | + mean_score.rename(columns={'normalized_score': 'mean_score'}, inplace=True) |
| 244 | + return df.merge(mean_score, on=['pipeline_id', 'context'], how='left') |
| 245 | + |
| 246 | + |
| 247 | +def z_score(x): |
| 248 | + if len(x) == 1 or x.std() == 0: |
| 249 | + return pd.Series(np.zeros(len(x)), index=x.index) |
| 250 | + |
| 251 | + return (x - x.mean()) / x.std() |
| 252 | + |
| 253 | + |
| 254 | +def apply_z_score(df): |
| 255 | + z_scores = df.groupby('dataset').normalized_score.apply(z_score) |
| 256 | + df['z_score'] = z_scores |
| 257 | + templates_z_score = df.groupby('template_id').z_score.mean() |
| 258 | + del df['z_score'] |
| 259 | + |
| 260 | + return df.merge(templates_z_score, how='left', left_on='template_id', right_index=True) |
| 261 | + |
| 262 | + |
| 263 | +def generate_metadata_report(pipeline_runs, pipelines_path, templates_path, datasets_path, report): |
| 264 | + |
| 265 | + results, discarded = extract_meta_information( |
| 266 | + pipeline_runs, pipelines_path, templates_path, datasets_path) |
| 267 | + |
| 268 | + if report is None: |
| 269 | + report = os.path.join(templates_path, 'templates.csv') |
| 270 | + |
| 271 | + df = pd.DataFrame(results) |
| 272 | + df = apply_mean_score(df) |
| 273 | + df = apply_z_score(df) |
| 274 | + df.to_csv(report, index=False) |
| 275 | + |
| 276 | + if errored: |
| 277 | + with open('errors.txt', 'w') as f: |
| 278 | + for error in errored: |
| 279 | + f.write('{}\n'.format(error)) |
| 280 | + |
| 281 | + |
| 282 | +def get_parser(): |
| 283 | + parser = argparse.ArgumentParser( |
| 284 | + description='Generate new templates from pipeline runs and the metadata reffered to them.') |
| 285 | + parser.add_argument('pipeline_runs_path', help='Path to the pipeline runs folder') |
| 286 | + parser.add_argument('pipelines_path', help='Path to the pipelines folder') |
| 287 | + parser.add_argument('templates_path', help='Path to the templates folder') |
| 288 | + parser.add_argument('datasets_path', help='Path where the datasets are located') |
| 289 | + parser.add_argument('-r', '--report', help='Path to the CSV file where scores will be dumped.') |
| 290 | + |
| 291 | + return parser.parse_args() |
| 292 | + |
| 293 | + |
| 294 | +def main(): |
| 295 | + args = parse_args() |
| 296 | + generate_metadata_report( |
| 297 | + args.pipeline_runs_path, |
| 298 | + args.pipelines_scored_path, |
| 299 | + args.templates_path, |
| 300 | + args.datasets_path, |
| 301 | + args.report, |
| 302 | + ) |
| 303 | + |
| 304 | + |
| 305 | +if __name__ == '__main__': |
| 306 | + main() |
0 commit comments