diff --git a/target_postgres/__init__.py b/target_postgres/__init__.py index 15a12b8..fd010ee 100644 --- a/target_postgres/__init__.py +++ b/target_postgres/__init__.py @@ -13,13 +13,25 @@ from tempfile import TemporaryFile import pkg_resources -from jsonschema.validators import Draft4Validator +from jsonschema import Draft4Validator, FormatChecker +from decimal import Decimal import singer from target_postgres.db_sync import DbSync logger = singer.get_logger() +def float_to_decimal(value): + '''Walk the given data structure and turn all instances of float into + double.''' + if isinstance(value, float): + return Decimal(str(value)) + if isinstance(value, list): + return [float_to_decimal(child) for child in value] + if isinstance(value, dict): + return {k: float_to_decimal(v) for k, v in value.items()} + return value + def emit_state(state): if state is not None: line = json.dumps(state) @@ -65,7 +77,7 @@ def persist_lines(config, lines): stream = o['stream'] # Validate record - validators[stream].validate(o['record']) + validators[stream].validate(float_to_decimal(o['record'])) sync = stream_to_sync[stream] @@ -93,7 +105,8 @@ def persist_lines(config, lines): raise Exception("Line is missing required key 'stream': {}".format(line)) stream = o['stream'] schemas[stream] = o - validators[stream] = Draft4Validator(o['schema']) + schema = float_to_decimal(o['schema']) + validators[stream] = Draft4Validator(schema, format_checker=FormatChecker()) if 'key_properties' not in o: raise Exception("key_properties field is required") key_properties[stream] = o['key_properties']