@@ -175,6 +175,56 @@ def ingest_artifacts(self, collection, isVerbose=False):
175175 """
176176 artifacts = collection
177177
178+ self .cur .execute ("BEGIN TRANSACTION" )
179+
180+ if self .list () is not None and list (artifacts .keys ()) == ["dsi_relations" ]:
181+ pk_list = artifacts ["dsi_relations" ]["primary_key" ]
182+ fk_list = artifacts ["dsi_relations" ]["foreign_key" ]
183+ pk_tables = set (t [0 ] for t in pk_list )
184+ fk_tables = set (t [0 ] for t in fk_list if t [0 ] != None )
185+ all_schema_tables = pk_tables .union (fk_tables )
186+ db_tables = [t [0 ] for t in self .list () if t [0 ] != "dsi_units" ]
187+
188+ # check if tables from dsi_relations are all in the db
189+ if all_schema_tables .issubset (set (db_tables )):
190+ circ , _ = self .check_table_relations (all_schema_tables , artifacts ["dsi_relations" ])
191+ if circ :
192+ return (ValueError , f"A complex schema with a circular dependency cannot be ingested into a DuckDB backend." )
193+
194+ drop_order = all_schema_tables
195+ collect = self .process_artifacts ()
196+ if "dsi_relations" in collect .keys ():
197+ curr_pk_tables = set (t [0 ] for t in collect ["dsi_relations" ]["primary_key" ])
198+ curr_fk_tables = set (t [0 ] for t in collect ["dsi_relations" ]["foreign_key" ] if t [0 ] != None )
199+ curr_schema_tables = curr_pk_tables .union (curr_fk_tables )
200+
201+ # need to drop and reingest all tables in old schema and new schema
202+ all_schema_tables = all_schema_tables .union (curr_schema_tables )
203+
204+ _ , ord_tables1 = self .check_table_relations (all_schema_tables , collect ["dsi_relations" ])
205+ drop_order = ord_tables1
206+
207+ for table in drop_order :
208+ self .cur .execute (f'DROP TABLE IF EXISTS "{ table } ";' )
209+ try :
210+ self .con .commit ()
211+ except Exception as e :
212+ self .cur .execute ("ROLLBACK" )
213+ self .cur .execute ("CHECKPOINT" )
214+ return (duckdb .Error , e )
215+
216+ #do not reingest tables not in old or new schema as they will be the same
217+ non_schema_tables = set (db_tables ) - all_schema_tables
218+ for t in non_schema_tables :
219+ del collect [t ]
220+
221+ collect ["dsi_relations" ] = artifacts ["dsi_relations" ]
222+ artifacts = collect
223+
224+ else :
225+ print ("WARNING: Complex schemas can only be ingested if all referenced data tables are loaded into DSI." )
226+
227+
178228 table_order = artifacts .keys ()
179229 if "dsi_relations" in artifacts .keys ():
180230 circular , ordered_tables = self .check_table_relations (artifacts .keys (), artifacts ["dsi_relations" ])
@@ -184,10 +234,8 @@ def ingest_artifacts(self, collection, isVerbose=False):
184234 else :
185235 table_order = list (reversed (ordered_tables )) # ingest primary key tables first then children
186236
187- self .cur .execute ("BEGIN TRANSACTION" )
188237 if self .runTable :
189- runTable_create = "CREATE TABLE IF NOT EXISTS runTable " \
190- "(run_id INTEGER PRIMARY KEY, run_timestamp TEXT UNIQUE);"
238+ runTable_create = "CREATE TABLE IF NOT EXISTS runTable (run_id INTEGER PRIMARY KEY, run_timestamp TEXT UNIQUE);"
191239 self .cur .execute (runTable_create )
192240
193241 sequence_run_id = "CREATE SEQUENCE IF NOT EXISTS seq_run_id START 1;"
@@ -387,13 +435,16 @@ def notebook(self, interactive=False):
387435 def read_to_artifact (self ):
388436 return self .process_artifacts ()
389437
390- def process_artifacts (self ):
438+ def process_artifacts (self , only_units_relations = False ):
391439 """
392440 Reads data from the DuckDB database into a nested OrderedDict.
393441 Keys are table names, and values are OrderedDicts containing table data.
394442
395443 If the database contains PK/FK relationships, they are stored in a special `dsi_relations` table.
396444
445+ `only_units_relations` : bool, default=False
446+ **USERS SHOULD IGNORE THIS FLAG.** Used internally by duckdb.py.
447+
397448 `return` : OrderedDict
398449 A nested OrderedDict containing all data from the DuckDB database.
399450 """
@@ -404,20 +455,22 @@ def process_artifacts(self):
404455 SELECT table_name FROM information_schema.tables
405456 WHERE table_schema = 'main' AND table_type = 'BASE TABLE'
406457 """ ).fetchall ()
407- for item in tableList :
408- tableName = self .duckdb_compatible_name (item [0 ])
409458
410- tableInfo = self .cur .execute (f"PRAGMA table_info({ tableName } );" ).fetchdf ()
411- colDict = OrderedDict ((self .duckdb_compatible_name (col ), []) for col in tableInfo ['name' ])
459+ if only_units_relations == False :
460+ for item in tableList :
461+ tableName = self .duckdb_compatible_name (item [0 ])
462+
463+ tableInfo = self .cur .execute (f"PRAGMA table_info({ tableName } );" ).fetchdf ()
464+ colDict = OrderedDict ((self .duckdb_compatible_name (col ), []) for col in tableInfo ['name' ])
412465
413- data = self .cur .execute (f"SELECT * FROM { tableName } ;" ).fetchall ()
414- for row in data :
415- for colName , val in zip (colDict .keys (), row ):
416- if val == "NULL" :
417- colDict [colName ].append (None )
418- else :
419- colDict [colName ].append (val )
420- artifact [tableName ] = colDict
466+ data = self .cur .execute (f"SELECT * FROM { tableName } ;" ).fetchall ()
467+ for row in data :
468+ for colName , val in zip (colDict .keys (), row ):
469+ if val == "NULL" :
470+ colDict [colName ].append (None )
471+ else :
472+ colDict [colName ].append (val )
473+ artifact [tableName ] = colDict
421474
422475 pk_list = []
423476 fkData = self .cur .execute (f"""
@@ -743,6 +796,8 @@ def list(self):
743796 SELECT table_name FROM information_schema.tables
744797 WHERE table_schema = 'main' AND table_type = 'BASE TABLE'
745798 """ ).fetchall ()
799+ if not tableList :
800+ return None
746801 tableList = [self .duckdb_compatible_name (table [0 ]) for table in tableList ]
747802
748803 info_list = []
@@ -839,12 +894,13 @@ def summary_helper(self, table_name):
839894 col_info = self .cur .execute (f"PRAGMA table_info({ table_name } )" ).fetchall ()
840895
841896 numeric_types = {'INTEGER' , 'REAL' , 'FLOAT' , 'NUMERIC' , 'DECIMAL' , 'DOUBLE' , 'BIGINT' }
842- headers = ['column' , 'type' , 'min' , 'max' , 'avg' , 'std_dev' ]
897+ headers = ['column' , 'type' , 'unique' , ' min' , 'max' , 'avg' , 'std_dev' ]
843898 rows = []
844899
845900 for col in col_info :
846901 col_name = col [1 ]
847902 col_type = col [2 ].upper ()
903+ unique_vals = self .cur .execute (f"SELECT COUNT(DISTINCT { col_name } ) FROM { table_name } ;" ).fetchone ()[0 ]
848904 is_primary = col [5 ] > 0
849905 display_name = f"{ col_name } *" if is_primary else col_name
850906
@@ -863,7 +919,7 @@ def summary_helper(self, table_name):
863919
864920 if avg_val != None and std_dev == None :
865921 std_dev = 0
866- rows .append ([display_name , col_type , min_val , max_val , avg_val , std_dev ])
922+ rows .append ([display_name , col_type , unique_vals , min_val , max_val , avg_val , std_dev ])
867923
868924 return headers , rows
869925
@@ -1007,7 +1063,7 @@ def visit(node):
10071063 if any (visit (node ) for node in list (graph .keys ())):
10081064 return True , None # Circular dependency detected
10091065
1010- # Step 3: Order tables from least dependencies to most (if no circular dependencies)
1066+ # Order tables from least dependencies to most (if no circular dependencies)
10111067 in_degree = {table : 0 for table in tables }
10121068 for child in graph :
10131069 for parent in graph [child ]:
0 commit comments