diff --git a/target_postgres/db_sync.py b/target_postgres/db_sync.py index 4fba051..e6aef05 100644 --- a/target_postgres/db_sync.py +++ b/target_postgres/db_sync.py @@ -43,6 +43,14 @@ def column_clause(name, schema_property): return '{} {}'.format(safe_column_name(name), column_type(schema_property)) +def sanitize(value): + if not isinstance(value, str): + return value + + # this sequence will cause the CSV load to fail + return value.replace("\\u0000", '') + + def flatten_key(k, parent_key, sep): full_key = parent_key + [k] inflected_key = [inflect_column_name(n) for n in full_key] @@ -60,18 +68,24 @@ def flatten_schema(d, parent_key=[], sep='__'): items = [] for k, v in d['properties'].items(): new_key = flatten_key(k, parent_key, sep) + + if not v: + logger.warn("Empty definition for {}.".format(new_key)) + continue + if 'type' in v.keys(): if 'object' in v['type']: items.extend(flatten_schema(v, parent_key + [k], sep=sep).items()) else: items.append((new_key, v)) else: - if list(v.values())[0][0]['type'] == 'string': - list(v.values())[0][0]['type'] = ['null', 'string'] - items.append((new_key, list(v.values())[0][0])) - elif list(v.values())[0][0]['type'] == 'array': - list(v.values())[0][0]['type'] = ['null', 'array'] - items.append((new_key, list(v.values())[0][0])) + property = list(v.values())[0][0] + if property['type'] == 'string': + property['type'] = ['null', 'string'] + items.append((new_key, property)) + elif property['type'] == 'array': + property['type'] = ['null', 'array'] + items.append((new_key, property)) key_func = lambda item: item[0] sorted_items = sorted(items, key=key_func) @@ -150,7 +164,7 @@ def record_to_csv_line(self, record): flatten = flatten_record(record) return ','.join( [ - json.dumps(flatten[name]) if name in flatten and flatten[name] else '' + json.dumps(sanitize(flatten[name])) if name in flatten and flatten[name] else '' for name in self.flatten_schema ] )