diff --git a/tools/Gen2toFabricDW/Config.cs b/tools/Gen2toFabricDW/Config.cs new file mode 100644 index 0000000..298df10 --- /dev/null +++ b/tools/Gen2toFabricDW/Config.cs @@ -0,0 +1,37 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Gen2toFabricDW +{ + public class Config + { + public string Gen2Connectionstring { get; set; } + public string fabricEndpoint { get; set; } + public string fabricWarehouse{ get; set; } + public List Tables { get; set; } + public string SQLCreateExternalTable { get; set; } + public string COPYINTO_Statement { get; set; } + public int batchsize { get; set; } + public string SPN_Application_ID { get; set; } + public string SPN_Secret { get; set; } + public string SPN_Tenant { get; set; } + public string abfslocation { get; set; } + public string httpslocation { get; set; } + public string SASKey { get; set; } + + } + + public class Table + { + public string Name { get; set; } + public string Schema { get; set; } + public string DropDestinationTable { get; set; } + public string CreateDestination { get; set; } + public string TruncateDestination { get; set; } + public string batchcolumn { get; set; } + public string externalTableSchema { get; set; } + } +} diff --git a/tools/Gen2toFabricDW/ExternalTableSetup.ssql b/tools/Gen2toFabricDW/ExternalTableSetup.ssql new file mode 100644 index 0000000..35d0bfa --- /dev/null +++ b/tools/Gen2toFabricDW/ExternalTableSetup.ssql @@ -0,0 +1,24 @@ +if not exists(select * from sys.external_file_formats where name='_extract_parquet') +begin + CREATE EXTERNAL FILE FORMAT _extract_parquet + WITH ( FORMAT_TYPE = PARQUET, DATA_COMPRESSION = 'org.apache.hadoop.io.compress.SnappyCodec') +end + +--DROP DATABASE SCOPED CREDENTIAL [_extract_cred] +if not exists(SELECT name, create_date, modify_date FROM sys.database_scoped_credentials where name ='_extract_cred') +BEGIN + CREATE DATABASE SCOPED CREDENTIAL [_extract_cred] WITH IDENTITY='SHARED ACCESS SIGNATURE', SECRET = '{SASKey}' +END + +-- drop EXTERNAL DATA SOURCE [_extract_storage] +if not exists(SELECT name FROM sys.external_data_sources where name = '_extract_storage') +BEGIN + CREATE EXTERNAL DATA SOURCE [_extract_storage] WITH ( type=hadoop, LOCATION = N'{abfslocation}', CREDENTIAL = [_extract_cred]) +END + +/* +-- script to list all external tables +select 'drop external table [' + s.name + '].[' + t.name + ']' from sys.tables t inner join sys.schemas s on s.schema_id = t.schema_id +where t.is_external = 1 +*/ + diff --git a/tools/Gen2toFabricDW/Gen2toFabricDW.csproj b/tools/Gen2toFabricDW/Gen2toFabricDW.csproj new file mode 100644 index 0000000..6aecd2b --- /dev/null +++ b/tools/Gen2toFabricDW/Gen2toFabricDW.csproj @@ -0,0 +1,26 @@ + + + + Exe + net8.0 + enable + enable + + + + + + + + + Always + + + Always + + + Always + + + + diff --git a/tools/Gen2toFabricDW/Gen2toFabricDW.sln b/tools/Gen2toFabricDW/Gen2toFabricDW.sln new file mode 100644 index 0000000..9f5c8fd --- /dev/null +++ b/tools/Gen2toFabricDW/Gen2toFabricDW.sln @@ -0,0 +1,22 @@ + +Microsoft Visual Studio Solution File, Format Version 12.00 +# Visual Studio Version 17 +VisualStudioVersion = 17.12.35707.178 d17.12 +MinimumVisualStudioVersion = 10.0.40219.1 +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Gen2toFabricDW", "Gen2toFabricDW.csproj", "{7A83E260-A325-4696-9D4D-9B8BCE75256C}" +EndProject +Global + GlobalSection(SolutionConfigurationPlatforms) = preSolution + Debug|Any CPU = Debug|Any CPU + Release|Any CPU = Release|Any CPU + EndGlobalSection + GlobalSection(ProjectConfigurationPlatforms) = postSolution + {7A83E260-A325-4696-9D4D-9B8BCE75256C}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {7A83E260-A325-4696-9D4D-9B8BCE75256C}.Debug|Any CPU.Build.0 = Debug|Any CPU + {7A83E260-A325-4696-9D4D-9B8BCE75256C}.Release|Any CPU.ActiveCfg = Release|Any CPU + {7A83E260-A325-4696-9D4D-9B8BCE75256C}.Release|Any CPU.Build.0 = Release|Any CPU + EndGlobalSection + GlobalSection(SolutionProperties) = preSolution + HideSolutionNode = FALSE + EndGlobalSection +EndGlobal diff --git a/tools/Gen2toFabricDW/Logging.cs b/tools/Gen2toFabricDW/Logging.cs new file mode 100644 index 0000000..1415844 --- /dev/null +++ b/tools/Gen2toFabricDW/Logging.cs @@ -0,0 +1,27 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Gen2toFabricDW +{ + public static class Logging + { + public static string locallogging = string.Empty; + public static void Log(string message, string logLevel = "INFO") + { + string logMessage = $"{DateTime.Now:yyyy-MM-dd HH:mm:ss} [{logLevel}] {message}"; + Console.WriteLine(logMessage); + try + { + File.AppendAllText(locallogging, logMessage + Environment.NewLine); + } + catch (Exception ex) + { + Console.WriteLine($"Failed to write log: {ex.Message}"); + } + } + + } +} diff --git a/tools/Gen2toFabricDW/Program.cs b/tools/Gen2toFabricDW/Program.cs new file mode 100644 index 0000000..0fd2bca --- /dev/null +++ b/tools/Gen2toFabricDW/Program.cs @@ -0,0 +1,268 @@ +using Gen2toFabricDW; +using Microsoft.Data.SqlClient; +using System.Collections.Generic; +using System.Text.Json; +class Program +{ + private static Config config = null; + static async Task Main(string[] args) + { + string configpath = System.IO.Path.GetDirectoryName(System.Reflection.Assembly.GetExecutingAssembly().Location); + Logging.locallogging = $"{configpath}\\logging.log"; + // string configpath = "C:\\temp"; + string wholepath = $"{configpath}\\config.json"; + string wholepat1h = $"{configpath}\\config.json"; + + string createTableSQL = ""; + string DropableSQL = ""; + string sElapseTime = ""; + // Read and parse configuration + config = LoadConfig(wholepath); + if (config == null) + { + Log("Failed to load configuration."); + return; + } + + string SQLScripts = File.ReadAllText($"{configpath}\\ExternalTableSetup.ssql"); + SQLScripts = ReplaceValues(null, "", SQLScripts, ""); + string sExtractTime = SQLServer.ExecuteNonQuery(config.Gen2Connectionstring, SQLScripts, true); + + + // loop through all the tables + foreach (var table in config.Tables) + { + string batchvalue = ""; + + string SQLSchemaScripts = File.ReadAllText($"{configpath}\\generatetableschema.ssql"); + SQLSchemaScripts = SQLSchemaScripts.Replace("{tableschemaname}", table.Schema); + SQLSchemaScripts = SQLSchemaScripts.Replace("{tablename}", table.Name); + + SQLServer.ExecuteRS(config.Gen2Connectionstring, SQLSchemaScripts, ref createTableSQL, ref DropableSQL, table); + + if(table.DropDestinationTable== "True") + { + sElapseTime = SQLServer.ExecuteFabric(config.fabricEndpoint, config.fabricWarehouse, DropableSQL); + Log($"Drop Destination Table: {sElapseTime}"); + } + + { + string sCreateExternalSchema = @"create schema {ss};"; + sCreateExternalSchema = sCreateExternalSchema.Replace("{ss}", table.externalTableSchema); + sExtractTime = SQLServer.ExecuteNonQuery(config.Gen2Connectionstring, sCreateExternalSchema, true); + + string sCreateSchema = @"create schema {s};"; + sCreateSchema = sCreateSchema.Replace("{s}", table.Schema); + sElapseTime = SQLServer.ExecuteFabric(config.fabricEndpoint, config.fabricWarehouse, sCreateSchema, true); + } + + if (table.CreateDestination == "True") + { + sElapseTime = SQLServer.ExecuteFabric(config.fabricEndpoint, config.fabricWarehouse, createTableSQL); + // Log($"Create Table SQL: {createTableSQL}"); + Log($"Create Destination Table: {sElapseTime}"); + } + + if (table.TruncateDestination == "True") + { + string sTruncateTable = $"truncate table {table.Schema}.{table.Name};"; + sElapseTime = SQLServer.ExecuteFabric(config.fabricEndpoint, config.fabricWarehouse, sTruncateTable); + // Log($"Truncate Table SQL: {sTruncateTable}"); + Log($"Truncate Destination Table: {sElapseTime}"); + } + + string sourceRowCount = ""; + string targetRowCount = ""; + + Boolean batchexportloop = true; + Boolean firstExport = true; + Boolean batchexport = false; + + if(table.batchcolumn.Length>0) + { batchexport = true; } + + while (batchexportloop) + { + if (batchexport == false) + { batchexportloop = false; } + + DeleteExternalTable(table); + + string batchcol = table.batchcolumn; + + string r = RandomName(); + string createExternaltable = config.SQLCreateExternalTable; + string ExternalTableSubfolder = "extracts/{r}/"; + createExternaltable = ReplaceValues(table, r, createExternaltable, ExternalTableSubfolder); + + if (batchexport == true) + { + createExternaltable = createExternaltable.Replace("{top}", $" top {config.batchsize} "); + + if (!firstExport) + { + createExternaltable = createExternaltable.Replace("{Whereclause}", " where {batchcolumn} > {colvalue} "); + + } + else + { + createExternaltable = createExternaltable.Replace("{Whereclause}", $" "); + } + createExternaltable = createExternaltable.Replace("{orderby}", " ORDER BY {batchcolumn} ASC "); + } + else + { + createExternaltable = createExternaltable.Replace("{top}", $" "); + createExternaltable = createExternaltable.Replace("{Whereclause}", $" "); + } + createExternaltable = createExternaltable.Replace("{top}", $" "); + createExternaltable = createExternaltable.Replace("{Whereclause}", $" "); + createExternaltable = createExternaltable.Replace("{orderby}", ""); + + //batchvalue + createExternaltable = createExternaltable.Replace("{batchcolumn}", table.batchcolumn); + createExternaltable = createExternaltable.Replace("{colvalue}", batchvalue); + // extract table + //Log($"Create External Table SQL: {createExternaltable}"); + Log("Extracting from Gen2.."); + sExtractTime = SQLServer.ExecuteNonQuery(config.Gen2Connectionstring, createExternaltable); + Logging.Log($"Extract data: RunTime {sExtractTime}"); + + string sSourceCount = "Select count(*) from {s}.{t}"; + sSourceCount = ReplaceValues(table, r, sSourceCount, ExternalTableSubfolder); + sourceRowCount = SQLServer.ExecuteScalar(config.Gen2Connectionstring, sSourceCount); + Logging.Log($"Extract data: Source Row Count {sourceRowCount}"); + + string copyinto = config.COPYINTO_Statement; + + /* copyinto = copyinto.Replace("{ExternalTableSubfolder}", ExternalTableSubfolder); + copyinto = copyinto.Replace("{r}", r); + copyinto = copyinto.Replace("{t}", table.Name); + copyinto = copyinto.Replace("{s}", table.Schema); + copyinto = copyinto.Replace("{ss}", table.externalTableSchema); + copyinto = copyinto.Replace("{httpslocation}", config.httpslocation); + copyinto = copyinto.Replace("{SASKey}", config.SASKey);*/ + + copyinto = ReplaceValues(table, r, copyinto, ExternalTableSubfolder); + + + Log("Starting Load into Fabric.."); + //Log($"Copy into SQL: {copyinto}"); + SQLServer.ExecuteCopyInto(config.SPN_Tenant, config.SPN_Application_ID, config.SPN_Secret, config.fabricEndpoint, config.fabricWarehouse, copyinto); + + string sDestCount = "Select count(*) from {s}.{t}"; + sDestCount = ReplaceValues(table, r, sDestCount, ExternalTableSubfolder); + targetRowCount = SQLServer.ExecuteScalarFabric(config.fabricEndpoint, config.fabricWarehouse, sDestCount); + Logging.Log($"Extract data: Destination Row Count {targetRowCount}"); + + if (batchexport == true) + { + string oldbatchvalue = batchvalue; + + string sCount = "select max({batchcolumn}) from {ss}.{t}"; + + sCount = ReplaceValues(table, "", sCount, ""); + batchvalue = SQLServer.ExecuteScalar(config.Gen2Connectionstring, sCount); + + if (batchvalue == oldbatchvalue) + { + batchexportloop = false; + break; + } + DeleteExternalTable(table); + } + firstExport = false; + } + } + } + + private static void DeleteExternalTable(Table table) + { + string sDropExternalTable = "if exists(select s.name,t.name from sys.tables t inner join sys.schemas s on s.schema_id = t.schema_id\r\nwhere s.name = '{ss}' and t.name ='{t}')\r\nbegin\r\n\tDROP EXTERNAL TABLE {ss}.{t}\r\nend\r\n"; + sDropExternalTable = ReplaceValues(table, "", sDropExternalTable, ""); + SQLServer.ExecuteNonQuery(config.Gen2Connectionstring, sDropExternalTable); + } + + private static string ReplaceValues(Table? table, string r, string createExternaltable, string ExternalTableSubfolder) + { + ExternalTableSubfolder = ExternalTableSubfolder.Replace("{r}", r); + createExternaltable = createExternaltable.Replace("{ExternalTableSubfolder}", ExternalTableSubfolder); + createExternaltable = createExternaltable.Replace("{r}", r); + if (table is not null) + { createExternaltable = createExternaltable.Replace("{t}", table.Name); + createExternaltable = createExternaltable.Replace("{s}", table.Schema); + createExternaltable = createExternaltable.Replace("{ss}", table.externalTableSchema); + createExternaltable = createExternaltable.Replace("{batchcolumn}", table.batchcolumn); + } + createExternaltable = createExternaltable.Replace("{httpslocation}", config.httpslocation); + createExternaltable = createExternaltable.Replace("{SASKey}", config.SASKey); + createExternaltable = createExternaltable.Replace("{abfslocation}", config.abfslocation); + + + return createExternaltable; + } + + public static string RandomName() + { + Random rand = new Random(); + + // Choosing the size of string + // Using Next() string + int stringlen = rand.Next(4, 10); + int randValue; + string str = ""; + char letter; + for (int i = 0; i < stringlen; i++) + { + + // Generating a random number. + randValue = rand.Next(0, 26); + + // Generating random character by converting + // the random number into character. + letter = Convert.ToChar(randValue + 65); + + // Appending the letter to string. + str = str + letter; + } + + return str; + } + static public T LoadConfig(string filePath) + { + try + { + if (File.Exists(filePath)) + { + string json = File.ReadAllText(filePath); + return JsonSerializer.Deserialize(json); + } + Log("File not found, returning default data."); + return default; + } + catch (Exception ex) + { + Log($"Error loading data: {ex.Message}"); + return default; + } + } + static public void SaveData(T data, string filePath) + { + lock (data) + { + try + { + string json = JsonSerializer.Serialize(data, new JsonSerializerOptions { WriteIndented = true }); + File.WriteAllText(filePath, json); + //Logging.Log("Data saved successfully."); + } + catch (Exception ex) + { + Log($"Error saving data: {ex.Message}"); + } + } + } + static void Log(string Message) { + Logging.Log(Message); + } +} \ No newline at end of file diff --git a/tools/Gen2toFabricDW/README.md b/tools/Gen2toFabricDW/README.md new file mode 100644 index 0000000..71a98a7 --- /dev/null +++ b/tools/Gen2toFabricDW/README.md @@ -0,0 +1,17 @@ +# Gen2 to Fabric DW +A utility to copy tables from a Synapse Dedicated SQL Pool / SQL DW Gen2 to Fabric DW. + +It does the following; +1. Extract the table schema from the Dedicated SQL pool. +1. Create the table on the Fabric DW. +1. Extract the data from the table using an external table to ADLS in parquet. +1. Run Copy into on Fabric Data Warehouse to import the parquet files into the Warehouse. + +## Details +The utility uses a SaS key to authenticate with ADLS so it can read and write to the storage account. +The performance of the extract is dependant the number of rows and the size of synapse data warehouse. +You the config file to pick which tables need to be copied. + +TODO: +1. Batching : If the table is very large, i.e. TB and the DWU is small, the table can be broken down into smaller batches. This reduces the CPU/memory overhead, but the table needs a column that can work as a high watermark. + diff --git a/tools/Gen2toFabricDW/SQLHelper.cs b/tools/Gen2toFabricDW/SQLHelper.cs new file mode 100644 index 0000000..fbc3878 --- /dev/null +++ b/tools/Gen2toFabricDW/SQLHelper.cs @@ -0,0 +1,267 @@ +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Azure.Identity; +using Microsoft.Data.SqlClient; + +namespace Gen2toFabricDW +{ + public static class SQLServer + { + static void Log(string Message) + { + Logging.Log(Message); + } + + public static SqlDataReader ExecuteRS(string connectionString, string query, ref string createTableSQL, ref string DropableSQL, Table t) + { + try + { + + SqlDataReader reader; + // Create and open a connection to SQL Server + using (SqlConnection connection = new SqlConnection(connectionString)) + { + connection.Open(); + //Log("Connection to SQL Server successful."); + + // Create a command object + using (SqlCommand command = new SqlCommand(query, connection)) + { + command.CommandTimeout = 0; + using (reader = command.ExecuteReader()) + { + + while (reader.Read()) + { + // Example: Access data by column index + + + if (reader[0].ToString().ToLower() == t.Schema.ToLower() + && reader[1].ToString().ToLower() == t.Name.ToLower()) + { + Log($"Schema: {reader[0]}, Table: {reader[1]}"); + createTableSQL = reader["Script"].ToString(); + DropableSQL = reader["DropStatement"].ToString(); + } + } + } + + } + } + return reader; + } + catch (Exception ex) + { + // Handle any errors that may have occurred + Log($"An error occurred: {ex.Message}"); + return null; + } + } + public static string ExecuteNonQuery(string connectionString, string query, Boolean ignoreerrors= false) + { + string elapsedTime = ""; + try + { + // Create and open a connection to SQL Server + using (SqlConnection connection = new SqlConnection(connectionString)) + { + + connection.Open(); + //Log("ExecuteNonQuery:Connection to SQL Server successful."); + + // Create a command object + using (SqlCommand command = new SqlCommand(query, connection)) + { + command.CommandTimeout = 0; + + // Log("Starting Extract..."); + + Stopwatch stopWatch = new Stopwatch(); + stopWatch.Start(); + command.ExecuteNonQuery(); + stopWatch.Stop(); + TimeSpan ts = stopWatch.Elapsed; + elapsedTime = String.Format("{0:00}:{1:00}:{2:00}.{3:00}", + ts.Hours, ts.Minutes, ts.Seconds, + ts.Milliseconds / 10); + // Log("Extract data: RunTime " + elapsedTime); + + } + } + } + catch (Exception ex) + { + // Handle any errors that may have occurred + if (!ignoreerrors) Log($"ExecuteNonQuery:An error occurred: {ex.Message}"); + } + return elapsedTime; + } + public static string ExecuteScalar(string connectionString, string query) + { + try + { + + string reader; + // Create and open a connection to SQL Server + using (SqlConnection connection = new SqlConnection(connectionString)) + { + connection.Open(); + //Log("Connection to SQL Server successful."); + + // Create a command object + using (SqlCommand command = new SqlCommand(query, connection)) + { + + command.CommandTimeout = 0; + reader = command.ExecuteScalar().ToString(); + } + } + return reader; + } + catch (Exception ex) + { + // Handle any errors that may have occurred + Log($"An error occurred: {ex.Message}"); + return string.Empty; + } + } + + public static void ExecuteCopyInto(string tenantId, string clientId, string clientSecret, string sqlWarehouseServer, string databaseName, string sql) + { + + + // string connectionString = $"Server={sqlWarehouseServer};Database={databaseName};Authentication=Active Directory Service Principal;Encrypt=True;TrustServerCertificate=False;"; + string connectionString = $"Server={sqlWarehouseServer};Database={databaseName};Authentication=ActiveDirectoryInteractive;Encrypt=True;TrustServerCertificate=False;"; + //var clientSecretCredential = new ClientSecretCredential(tenantId, clientId, clientSecret); + var defaultCredential = new DefaultAzureCredential(); + + try + { + // Get an Access Token + //string accessToken = clientSecretCredential.GetToken( + // new Azure.Core.TokenRequestContext(new[] { "https://database.windows.net/.default" }) + //).Token; + +// string accessToken = defaultCredential.GetToken( +// new Azure.Core.TokenRequestContext(new[] { "https://database.windows.net/.default" }) +//).Token; + + // Add the Access Token to the SQL connection + using (var connection = new SqlConnection(connectionString)) + { + //connection.AccessToken = accessToken; + + // Open the connection + connection.Open(); + //Console.WriteLine("Connection successful!"); + + // Execute a test query + using (var command = new SqlCommand(sql, connection)) + { + command.CommandTimeout = 0; + Stopwatch stopWatch = new Stopwatch(); + stopWatch.Start(); + command.ExecuteNonQuery(); + stopWatch.Stop(); + TimeSpan ts = stopWatch.Elapsed; + string elapsedTime = String.Format("{0:00}:{1:00}:{2:00}.{3:00}", + ts.Hours, ts.Minutes, ts.Seconds, + ts.Milliseconds / 10); + Log("ExecuteCopyInto: RunTime " + elapsedTime); + } + } + } + catch (Exception ex) + { + Log($"Error: {ex.Message}"); + } + } + + public static string ExecuteFabric( string sqlWarehouseServer, string databaseName, string sql, Boolean ignoreerrors= false) + { + string elapsedTime = ""; ; + + // string connectionString = $"Server={sqlWarehouseServer};Database={databaseName};Authentication=Active Directory Service Principal;Encrypt=True;TrustServerCertificate=False;"; + string connectionString = $"Server={sqlWarehouseServer};Database={databaseName};Authentication=ActiveDirectoryInteractive;Encrypt=True;TrustServerCertificate=False;"; + //var clientSecretCredential = new ClientSecretCredential(tenantId, clientId, clientSecret); + //var defaultCredential = new DefaultAzureCredential(); + + try + { + + // Add the Access Token to the SQL connection + using (var connection = new SqlConnection(connectionString)) + { + //connection.AccessToken = accessToken; + + // Open the connection + connection.Open(); + //Console.WriteLine("Connection successful!"); + + // Execute a test query + using (var command = new SqlCommand(sql, connection)) + { + command.CommandTimeout = 0; + Stopwatch stopWatch = new Stopwatch(); + stopWatch.Start(); + command.ExecuteNonQuery(); + stopWatch.Stop(); + TimeSpan ts = stopWatch.Elapsed; + elapsedTime = String.Format("{0:00}:{1:00}:{2:00}.{3:00}", + ts.Hours, ts.Minutes, ts.Seconds, + ts.Milliseconds / 10); + // Log("ExecuteCopyInto: RunTime " + elapsedTime); + } + } + } + catch (Exception ex) + { + if (!ignoreerrors) Log($"ExecuteFabric:An error occurred: {ex.Message}"); + } + + return elapsedTime; + } + public static string ExecuteScalarFabric(string sqlWarehouseServer, string databaseName, string sql, Boolean ignoreerrors = false) + { + string elapsedTime = ""; ; + + // string connectionString = $"Server={sqlWarehouseServer};Database={databaseName};Authentication=Active Directory Service Principal;Encrypt=True;TrustServerCertificate=False;"; + string connectionString = $"Server={sqlWarehouseServer};Database={databaseName};Authentication=ActiveDirectoryInteractive;Encrypt=True;TrustServerCertificate=False;"; + //var clientSecretCredential = new ClientSecretCredential(tenantId, clientId, clientSecret); + //var defaultCredential = new DefaultAzureCredential(); + + try + { + + // Add the Access Token to the SQL connection + using (var connection = new SqlConnection(connectionString)) + { + //connection.AccessToken = accessToken; + + // Open the connection + connection.Open(); + //Console.WriteLine("Connection successful!"); + + // Execute a test query + using (var command = new SqlCommand(sql, connection)) + { + command.CommandTimeout = 0; + elapsedTime = command.ExecuteScalar().ToString(); + // Log("ExecuteCopyInto: RunTime " + elapsedTime); + } + } + } + catch (Exception ex) + { + if (!ignoreerrors) Log($"ExecuteFabric:An error occurred: {ex.Message}"); + } + + return elapsedTime; + } + + } +} diff --git a/tools/Gen2toFabricDW/config.json b/tools/Gen2toFabricDW/config.json new file mode 100644 index 0000000..d23c8f1 --- /dev/null +++ b/tools/Gen2toFabricDW/config.json @@ -0,0 +1,22 @@ +{ + "Gen2Connectionstring": "Server=myGen2.database.windows.net;Database=mydatabase;User Id=extract_user;Pwd=extract_user_password;TrustServerCertificate=True;", + "fabricEndpoint": "fabric-dw-endpoint.datawarehouse.fabric.microsoft.com", + "fabricWarehouse": "fabricDataWarehouseName", + "SQLCreateExternalTable": "CREATE EXTERNAL TABLE {ss}.{t} WITH (\tLOCATION = '{ExternalTableSubfolder}',\tDATA_SOURCE = _extract_storage, FILE_FORMAT = _extract_parquet\t) AS SELECT {top} * FROM {s}.{t} {Whereclause} {orderby}", + "COPYINTO_Statement": "COPY INTO {s}.{t} FROM '{httpslocation}{ExternalTableSubfolder}' WITH ( FILE_TYPE = 'PARQUET', CREDENTIAL=(IDENTITY= 'Shared Access Signature', SECRET='{SASKey}'));", + "abfslocation": "abfss://container@storage-account.dfs.core.windows.net/", + "httpslocation": "https://storage-account.blob.core.windows.net/container/", + "SASKey": "-the SaS key-", + "batchsize": 60000, + "Tables": [ + { + "Schema": "schema", + "Name": "table-name", + "DropDestinationTable": "True", + "CreateDestination": "True", + "TruncateDestination": "False", + "batchcolumn": "", + "externalTableSchema": "ext" + } + ] +} \ No newline at end of file diff --git a/tools/Gen2toFabricDW/generatetableschema.ssql b/tools/Gen2toFabricDW/generatetableschema.ssql new file mode 100644 index 0000000..d6c8dd6 --- /dev/null +++ b/tools/Gen2toFabricDW/generatetableschema.ssql @@ -0,0 +1,148 @@ + + +IF (object_id('tempdb.dbo.#tbl','U') IS NOT NULL) DROP TABLE #tbl +create table #tbl with(distribution=round_robin,heap) as +select tbl.object_id,sc.name SchName, tbl.name tblName , c.column_id colid, c.name colname, t.name as coltype, c.max_length colmaxlength, c.precision colprecision, + c.scale colscale, c.is_nullable colnullable, c.collation_name +from sys.columns c + join sys.tables tbl on tbl.object_id=c.object_id + join sys.types t on t.user_type_id = c.user_type_id + inner join sys.schemas sc on tbl.schema_id=sc.schema_id + left join sys.default_constraints dc on c.default_object_id =dc.object_id and c.object_id =dc.parent_object_id + +/* + Extract Table Constraints +*/ +IF (object_id('tempdb.dbo.#tbl_constr','U') IS NOT NULL) DROP TABLE #tbl_constr +create table #tbl_constr with(distribution=round_robin,heap) as +SELECT ic.[object_id] + ,ic.[column_id] + ,kc.name + ,kc.type_desc + ,kc.is_enforced + ,c.name ColName + , c.is_nullable + , null definition + FROM [sys].[index_columns] ic + left join [sys].[key_constraints] kc on kc.parent_object_id=ic.object_id and kc.unique_index_id=ic.index_id + left join [sys].[default_constraints] dc on kc.parent_object_id=dc.parent_object_id + left join sys.columns c on c.column_id=ic.column_id and c.object_id=ic.object_id +UNION ALL +SELECT kc.[parent_object_id] object_id + ,kc.[parent_column_id] column_id + ,kc.[name] kcName + ,kc.[type_desc] + ,null is_enforced + ,c.name colName + ,c.is_nullable + ,kc.[definition] + FROM [sys].[default_constraints] kc + inner join sys.columns c on c.column_id=kc.parent_column_id and c.object_id=kc.parent_object_id + +create table #tbl_fin with(distribution=round_robin,heap) as +select t.*, tc.type_desc, tc.is_enforced, tc.definition from dbo.#tbl t +left join dbo.#tbl_constr tc on t.object_id=tc.object_id and t.colid=tc.column_id; + + +IF (object_id('tempdb.dbo.#tbl_Defs','U') IS NOT NULL) DROP TABLE #tbl_Defs +create table #tbl_Defs with(distribution=round_robin,heap) as +select SchName + ,tblName + ,'['+colname+']' as colname + ,colid + ,CASE WHEN type_desc = 'UNIQUE_CONSTRAINT' THEN CONCAT('['+colname+']',' ',NewTypeDef, ' ', colnullable,' /*UNIQUE',case when is_enforced=0 THEN ' NOT ENFORCED*/' END) + WHEN type_desc = 'PRIMARY_KEY_CONSTRAINT' THEN CONCAT('['+colname+']',' ',NewTypeDef,' ', colnullable,' /*PRIMARY KEY NONCLUSTERED',case when is_enforced=0 THEN ' NOT ENFORCED' END, '*/') + WHEN type_desc = 'DEFAULT_CONSTRAINT' THEN CONCAT('['+colname+']',' ',NewTypeDef,' ',colnullable,' /*DEFAULT',definition,'*/') + ELSE CONCAT('['+colname+']',' ',NewTypeDef,' ',colnullable) end NewColDef +from( +SELECT top 1000000000 SchName + ,tblName + ,colname + ,colid + ,coltype + ,case when colnullable = 1 THEN cast('NULL' as varchar) else cast('NOT NULL' as varchar) END colnullable + ,CASE WHEN coltype IN('numeric','decimal') THEN CONCAT(coltype,'(',colprecision,',',colscale,')') + WHEN coltype IN('money','smallmoney') THEN CONCAT('decimal','(',colprecision,',',colscale,')') + WHEN coltype in('smalldatetime','datetime','datetime2','datetimeoffset') THEN CONCAT('datetime2','(',case when colscale >6 then 6 else colscale end,')') + WHEN coltype IN('time') THEN CONCAT(coltype,'(',case when colscale > 6 then 6 else colscale end,')') + WHEN coltype IN('binary','varbinary','image') THEN 'varbinary' + when (coltype IN('char','varchar','text','ntext') and colmaxlength = -1) THEN CONCAT(coltype,'(8000)') + when (coltype IN('char','varchar','text') and colmaxlength != -1) THEN CONCAT(coltype,'(',colmaxlength,')') + when (coltype IN('ntext') and colmaxlength != -1) THEN CONCAT(coltype,'(',colmaxlength/2,')') + when (coltype IN('sysname')) THEN CONCAT('varchar','(8000)') + when (coltype IN('nchar') and colmaxlength = -1) THEN CONCAT('char','(8000)') + when (coltype IN('nvarchar') and colmaxlength = -1) THEN CONCAT('varchar','(8000)') + + when (coltype IN('nchar') and colmaxlength != -1) THEN CONCAT('char','(',colmaxlength/2,')') + when (coltype IN('nvarchar') and colmaxlength != -1) THEN CONCAT('varchar','(',colmaxlength/2,')') + + WHEN coltype IN('tinyint') THEN 'smallint' + + ELSE coltype END NewTypeDef + ,type_desc + ,is_enforced + ,definition + from #tbl_fin + order by colid +) a; + +DECLARE @STR_AGG_Var AS NVARCHAR(MAX) +SET @STR_AGG_Var = CAST(', ' AS NVARCHAR(MAX)) + +IF (object_id('tempdb.dbo.#tbl_FinalScript','U') IS NOT NULL) DROP TABLE #tbl_FinalScript; +create table #tbl_FinalScript (SchName nvarchar(200), objName varchar(200), Script nvarchar(max), DropStatement VARCHAR(1000)) with(distribution=round_robin,heap); +INSERT INTO #tbl_FinalScript + +select SchName,tblName, CAST(DDLScript AS NVARCHAR(MAX)) as Script, 'IF (object_id('''+ SchName + '.' + tblName + ''',''U'') IS NOT NULL) DROP TABLE ['+ SchName + '].[' + tblName +'];' as DropStatement +FROM ( +select SchName,tblName,CAST(concat(CAST('CREATE TABLE [' AS NVARCHAR(MAX)),CAST(SchName AS NVARCHAR(MAX)),CAST('].[' AS NVARCHAR(MAX)),CAST(tblName AS NVARCHAR(MAX)), + CAST('](' AS NVARCHAR(MAX)) ,CAST(STRING_AGG (CONVERT(NVARCHAR(max),NewColDef),', ') AS NVARCHAR(MAX)) + ,CAST(')' AS NVARCHAR(MAX))) AS NVARCHAR(MAX)) DDLScript +from #tbl_Defs t +group by SchName,tblName +UNION ALL +SELECT * from ( +select SchName + ,tblName + ,CASE WHEN type_desc = 'UNIQUE_CONSTRAINT' + THEN CONCAT('ALTER TABLE ',SchName,'.',tblName,' ADD CONSTRAINT ConstrUnique',tblName,' UNIQUE NONCLUSTERED(',colname,') NOT ENFORCED') + WHEN type_desc = 'PRIMARY_KEY_CONSTRAINT' + THEN CONCAT('ALTER TABLE ',SchName,'.',tblName,' ADD CONSTRAINT ConstrPK',tblName,' PRIMARY KEY NONCLUSTERED(',colname,') NOT ENFORCED') + WHEN type_desc = 'DEFAULT_CONSTRAINT' + THEN CONCAT('ALTER TABLE ',SchName,'.',tblName,' ADD DEFAULT ',replace(replace(definition,'))',')'),'((','('),' FOR ',colname) + end NewConstraintDef +from( +SELECT top 1000000000 SchName + ,tblName + ,'['+colname+']' as colname + ,colid + ,coltype + ,case when colnullable = 1 THEN cast('NULL' as varchar) else cast('NOT NULL' as varchar) END colnullable + ,CASE WHEN coltype IN('numeric','decimal') THEN CONCAT(coltype,'(',colprecision,',',colscale,')') + WHEN coltype IN('money','smallmoney') THEN CONCAT('decimal','(',colprecision,',',colscale,')') + WHEN coltype in('smalldatetime','datetime','datetime2','datetimeoffset') THEN CONCAT('datetime2','(',case when colscale >6 then 6 else colscale end,')') + WHEN coltype IN('time') THEN CONCAT(coltype,'(',case when colscale > 6 then 6 else colscale end,')') + WHEN coltype IN('binary','varbinary','image') THEN 'varbinary' + when (coltype IN('char','varchar','text','ntext') and colmaxlength = -1) THEN CONCAT(coltype,'(8000)') + when (coltype IN('char','varchar','text') and colmaxlength != -1) THEN CONCAT(coltype,'(',colmaxlength,')') + when (coltype IN('ntext') and colmaxlength != -1) THEN CONCAT(coltype,'(',colmaxlength/2,')') + when (coltype IN('sysname')) THEN CONCAT('varchar','(8000)') + when (coltype IN('nchar') and colmaxlength = -1) THEN CONCAT('char','(8000)') + when (coltype IN('nvarchar') and colmaxlength = -1) THEN CONCAT('varchar','(8000)') + + when (coltype IN('nchar') and colmaxlength != -1) THEN CONCAT('char','(',colmaxlength/2,')') + when (coltype IN('nvarchar') and colmaxlength != -1) THEN CONCAT('varchar','(',colmaxlength/2,')') + + WHEN coltype IN('tinyint') THEN 'smallint' + ELSE coltype END NewTypeDef + ,type_desc + ,is_enforced + ,definition + from #tbl_fin + order by colid +) a +) b +where NewConstraintDef is not null +) c; + +select * from #tbl_FinalScript where schName NOT IN ('INFORMATION_SCHEMA','sys','sysdiag','migration')