1
1
import datetime
2
2
import logging
3
+ import json
3
4
import pickle
4
5
import random
5
6
import uuid
@@ -745,6 +746,7 @@ def copy(
745
746
allow_jagged_rows : bool = True ,
746
747
quote : Optional [str ] = None ,
747
748
schema : Optional [List [dict ]] = None ,
749
+ convert_dict_columns_to_json : bool = True ,
748
750
** load_kwargs ,
749
751
):
750
752
"""
@@ -774,6 +776,8 @@ def copy(
774
776
template_table: str
775
777
Table name to be used as the load schema. Load operation wil use the same
776
778
columns and data types as the template table.
779
+ convert_dict_columns_to_json: bool
780
+ If set to True, will convert any dict columns (which cannot by default be successfully loaded to BigQuery to JSON strings)
777
781
**load_kwargs: kwargs
778
782
Arguments to pass to the underlying load_table_from_uri call on the BigQuery
779
783
client.
@@ -796,6 +800,19 @@ def copy(
796
800
else :
797
801
csv_delimiter = ","
798
802
803
+ if convert_dict_columns_to_json :
804
+ # Convert dict columns to JSON strings
805
+ for field in tbl .get_columns_type_stats ():
806
+ if "dict" in field ["type" ]:
807
+ new_petl = tbl .table .addfield (
808
+ field ["name" ] + "_replace" , lambda row : json .dumps (row [field ["name" ]])
809
+ )
810
+ new_tbl = Table (new_petl )
811
+ new_tbl .remove_column (field ["name" ])
812
+ new_tbl .rename_column (field ["name" ] + "_replace" , field ["name" ])
813
+ new_tbl .materialize ()
814
+ tbl = new_tbl
815
+
799
816
job_config = self ._process_job_config (
800
817
job_config = job_config ,
801
818
destination_table_name = table_name ,
0 commit comments