diff --git a/accelerators/CICD/Branch-out-to-new-workspace/AzDO/Branch_out_workspace.yml b/accelerators/CICD/Branch-out-to-new-workspace/AzDO/Branch_out_workspace.yml
index ea6e885..1aeb9a2 100644
--- a/accelerators/CICD/Branch-out-to-new-workspace/AzDO/Branch_out_workspace.yml
+++ b/accelerators/CICD/Branch-out-to-new-workspace/AzDO/Branch_out_workspace.yml
@@ -9,37 +9,37 @@ parameters:
- name: source_workspace
displayName: Enter source workspace
type: string
- default: 'Dev_WS_CICDSample_3'
+ default: ''
- name: target_workspace
displayName: Enter target workspace name
type: string
- default: 'Dev_WS_CICDSample_Clone5'
+ default: ''
- name: copy_lakehouse_data
displayName: Copy Lakehouse Data (enter True or False)
type: string
- default: 'True'
+ default: 'False'
- name: copy_warehouse_data
- displayName: Copy Lakehouse Data (enter True or False)
+ displayName: Copy Warehouse Data (enter True or False)
type: string
default: 'False'
- name: create_lakehouse_shortcuts
displayName: Create lakehouse shortcuts (only if copy lakehouse data set to False)
type: string
- default: 'False'
+ default: 'True'
- name: developer_email
displayName: Enter developer email
type: string
- default: 'reportbuilder1@MngEnvMCAP553100.onmicrosoft.com'
+ default: ''
- name: capacity_id
displayName: Enter capacity ID of the new workspace
type: string
- default: 'B34D9528-0FF8-4E40-865D-8BA769F574BB'
+ default: ''
- name: ado_branch
@@ -47,10 +47,15 @@ parameters:
type: string
default: 'main'
+- name: ado_git_folder
+ displayName: Folder in the repo where the Fabric content is stored. Leave as / if content is stored in root.
+ type: string
+ default: "/"
+
- name: connections_from_to
- displayName: Swap connections in pipelines using names or IDs in the format (from,to) format
+ displayName: Swap connections in pipelines using names or IDs in the format (from,to) format. Leave as () if no connections to swap.
type: string
- default: "('4498340c-27cf-4c6e-a025-00e5de6b0726','4498340c-27cf-4c6e-a025-00e5de6b0726')"
+ default: "()"
variables:
- group: Fabric_Deployment_Group_S
@@ -77,7 +82,7 @@ stages:
inputs:
scriptSource: 'filePath'
scriptPath: 'scripts/BranchOut-Feature-Workspace-Automation.py'
- arguments: '--ADO_ORG_NAME $(ADO_ORG_NAME) --ADO_REPO_NAME $(ADO_REPO_NAME) --ADO_PROJECT_NAME $(ADO_PROJECT_NAME) --ADO_NEW_BRANCH ${{ parameters.target_workspace}} --DEVELOPER ${{ parameters.developer_email }} --WORKSPACE_NAME ${{ parameters.target_workspace }} --CAPACITY_ID ${{ parameters.capacity_id }} --ADO_API_URL $(ADO_API_URL) --ADO_MAIN_BRANCH ${{ parameters.ado_branch }} --TENANT_ID $(TENANT_ID) --FABRIC_TOKEN $(fabrictoken) --ADO_PAT_TOKEN $(azdopat) --CLIENT_ID $(azclientid) --USER_NAME $(username) --PASSWORD $(password)'
+ arguments: '--ADO_ORG_NAME $(ADO_ORG_NAME) --ADO_REPO_NAME $(ADO_REPO_NAME) --ADO_PROJECT_NAME $(ADO_PROJECT_NAME) --ADO_NEW_BRANCH ${{ parameters.target_workspace}} --DEVELOPER ${{ parameters.developer_email }} --WORKSPACE_NAME ${{ parameters.target_workspace }} --CAPACITY_ID ${{ parameters.capacity_id }} --ADO_API_URL $(ADO_API_URL) --ADO_MAIN_BRANCH ${{ parameters.ado_branch }} --ADO_GIT_FOLDER ${{ parameters.ado_git_folder }} --TENANT_ID $(TENANT_ID) --FABRIC_TOKEN $(fabrictoken) --ADO_PAT_TOKEN $(azdopat) --CLIENT_ID $(azclientid) --USER_NAME $(username) --PASSWORD $(password)'
#failOnStderr: true
displayName: 'Run Branch-Out-To-New-Workspace Script'
diff --git a/accelerators/CICD/Branch-out-to-new-workspace/AzDO/scripts/BranchOut-Feature-Workspace-Automation.py b/accelerators/CICD/Branch-out-to-new-workspace/AzDO/scripts/BranchOut-Feature-Workspace-Automation.py
index 405bda3..fba84d3 100644
--- a/accelerators/CICD/Branch-out-to-new-workspace/AzDO/scripts/BranchOut-Feature-Workspace-Automation.py
+++ b/accelerators/CICD/Branch-out-to-new-workspace/AzDO/scripts/BranchOut-Feature-Workspace-Automation.py
@@ -18,6 +18,7 @@
DEVELOPER = ""
ADO_MAIN_BRANCH = ""
ADO_NEW_BRANCH = ""
+ADO_GIT_FOLDER = ""
ADO_PROJECT_NAME = ""
ADO_REPO_NAME = ""
ADO_ORG_NAME = ""
@@ -187,9 +188,9 @@ def get_branch_object_id(project_name, repo_name, branch_name, token, token_type
return None
# Function to connect Azure DevOps branch to Fabric workspace
-def connect_branch_to_workspace(workspace_id, project_name, org_name, repo_name, branch_name, token):
+def connect_branch_to_workspace(workspace_id, project_name, org_name, repo_name, branch_name, git_folder, token):
try:
- logging.info(f"Conecting workspace {workspace_id} to feature branch {branch_name} is in progess..")
+ logging.info(f"Conecting workspace {workspace_id} to feature branch {branch_name} at folder {git_folder}..")
headers = {"Authorization": f"Bearer {token}"}
data = {
"gitProviderDetails": {
@@ -198,7 +199,7 @@ def connect_branch_to_workspace(workspace_id, project_name, org_name, repo_name,
"gitProviderType": "AzureDevOps",
"repositoryName": repo_name,
"branchName": branch_name,
- "directoryName": ""
+ "directoryName": git_folder.rstrip("/")
}
}
response = requests.post(f"{FABRIC_API_URL}/workspaces/{workspace_id}/git/connect", headers=headers, json=data)
@@ -233,7 +234,7 @@ def long_running_operation_polling(uri,retry_after,headers):
def initialize_workspace_from_git(workspace_id,token):
try:
- logging.info(f"Connecting f{WORKSPACE_NAME} to feature branch {ADO_NEW_BRANCH} is in propress... ")
+ logging.info(f"Initializing {WORKSPACE_NAME} to feature branch {ADO_NEW_BRANCH} is in propress... ")
headers = {"Authorization": f"Bearer {token}"}
# Initialize the connection to the GIT repository
gitinitializeurl = f"{FABRIC_API_URL}/workspaces/{workspace_id}/git/initializeConnection"
@@ -290,6 +291,7 @@ def set_main_parameters():
global DEVELOPER
global ADO_MAIN_BRANCH
global ADO_NEW_BRANCH
+ global ADO_GIT_FOLDER
global ADO_PROJECT_NAME
global ADO_REPO_NAME
global ADO_ORG_NAME
@@ -311,6 +313,7 @@ def set_main_parameters():
parser.add_argument('--WORKSPACE_NAME',type=str, help= 'Name of the feature workspace to be created')
parser.add_argument('--DEVELOPER',type=str, help= 'Developr UPN to be added to workspace as admin')
parser.add_argument('--ADO_MAIN_BRANCH',type=str, help= 'Main development branch')
+ parser.add_argument('--ADO_GIT_FOLDER',type=str, help= 'Folder where Fabric content is stored')
parser.add_argument('--ADO_NEW_BRANCH',type=str, help= 'New branch to be created')
parser.add_argument('--ADO_PROJECT_NAME',type=str, help= 'ADO project name')
parser.add_argument('--ADO_REPO_NAME',type=str, help= 'ADO repository name')
@@ -324,6 +327,7 @@ def set_main_parameters():
logging.error(f'Error: {e}')
raise ValueError("Could not extract parameters: {e}")
+ logging.info('Binding parameters...')
#Bind parameters to script variables
TENANT_ID = args.TENANT_ID
USERNAME = args.USER_NAME
@@ -332,6 +336,7 @@ def set_main_parameters():
DEVELOPER = args.DEVELOPER
ADO_MAIN_BRANCH = args.ADO_MAIN_BRANCH
ADO_NEW_BRANCH = args.ADO_NEW_BRANCH
+ ADO_GIT_FOLDER = args.ADO_GIT_FOLDER
ADO_PROJECT_NAME = args.ADO_PROJECT_NAME
ADO_REPO_NAME = args.ADO_REPO_NAME
ADO_ORG_NAME = args.ADO_ORG_NAME
@@ -364,10 +369,10 @@ def main():
logging.info(f'Workspace {WORKSPACE_NAME} ({workspace_id}) successfully created and assigned to capacity {CAPACITY_ID}')
logging.info(f'Adding workspace admins {DEVELOPER}...')
add_workspace_admins(workspace_id, DEVELOPER, token)
- logging.info(f'Creating ado branch from main {ADO_MAIN_BRANCH}...')
+ logging.info(f'Creating ado branch {ADO_NEW_BRANCH} from {ADO_MAIN_BRANCH}...')
create_azure_devops_branch(ADO_PROJECT_NAME, ADO_REPO_NAME, ADO_MAIN_BRANCH, ADO_NEW_BRANCH)
logging.info(f'Connecting workspace to branch {ADO_NEW_BRANCH}...')
- connect_branch_to_workspace(workspace_id, ADO_PROJECT_NAME, ADO_ORG_NAME,ADO_REPO_NAME, ADO_NEW_BRANCH, token)
+ connect_branch_to_workspace(workspace_id, ADO_PROJECT_NAME, ADO_ORG_NAME,ADO_REPO_NAME, ADO_NEW_BRANCH, ADO_GIT_FOLDER, token)
logging.info('Initialize workspace...')
initialize_workspace_from_git(workspace_id, token)
else:
diff --git a/accelerators/CICD/Branch-out-to-new-workspace/AzDO/scripts/Run_post_activity.py b/accelerators/CICD/Branch-out-to-new-workspace/AzDO/scripts/Run_post_activity.py
index a191070..9875d59 100644
--- a/accelerators/CICD/Branch-out-to-new-workspace/AzDO/scripts/Run_post_activity.py
+++ b/accelerators/CICD/Branch-out-to-new-workspace/AzDO/scripts/Run_post_activity.py
@@ -69,41 +69,54 @@ def acquire_token_user_id_password(tenant_id, client_id,user_name,password):
logging.error('Error: Token could not be obtained: '+str(result))
return access_token
-logging.info('Checking for supplied credentials...')
-if FABRIC_TOKEN!="":
- logging.info('Fabric token found...')
- token = FABRIC_TOKEN
-else:
- logging.info('User creds found, generating token...')
- token = acquire_token_user_id_password(TENANT_ID,CLIENT_ID,user_name,password)
-
-if token:
- if NOTEBOOK_ID == '':
- raise ValueError('Error: Could not execute notebook as no Notebook ID has been specified.')
+def main():
+ logging.info('Checking for supplied credentials...')
+ if FABRIC_TOKEN!="":
+ logging.info('Fabric token found...')
+ token = FABRIC_TOKEN
+ else:
+ logging.info('User creds found, generating token...')
+ token = acquire_token_user_id_password(TENANT_ID,CLIENT_ID,user_name,password)
+
+ if token:
+ if NOTEBOOK_ID == '':
+ raise ValueError('Error: Could not execute notebook as no Notebook ID has been specified.')
+
+ plurl = 'https://api.fabric.microsoft.com/v1/workspaces/'+WS_ID +'/items/'+NOTEBOOK_ID+'/jobs/instances?jobType=RunNotebook'
- plurl = 'https://api.fabric.microsoft.com/v1/workspaces/'+WS_ID +'/items/'+NOTEBOOK_ID+'/jobs/instances?jobType=RunNotebook'
+ headers = {
+ "Authorization": f"Bearer {token}",
+ "Content-Type": "application/json" # Set the content type based on your request
+ }
+ logging.info('Setting notebook parameters...')
+ payload_data = '{' \
+ '"executionData": {' \
+ '"parameters": {' \
+ '"_inlineInstallationEnabled": {"value": "True", "type": "bool"},' \
+ '"source_ws": {"value": "' + SOURCE_WS + '", "type": "string"},' \
+ '"copy_lakehouse_data": {"value": "' + COPY_LH + '", "type": "bool"},' \
+ '"create_lakehouse_shortcuts": {"value": "' + CREATE_SC + '", "type": "bool"},' \
+ '"copy_warehouse_data": {"value": "' + COPY_WH + '", "type": "bool"},' \
+ '"target_ws": {"value": "' + TARGET_WS + '", "type": "string"},' \
+ '"p_connections_from_to": {"value": "' + CONNECTIONS_FROM_TO + '", "type": "string"}' \
+ '}}}'
+ logging.info('Invoking Fabric notebook job...')
+ plresponse = requests.post(plurl, json=json.loads(payload_data), headers=headers)
+ #logging.info(str(plresponse.status_code) + ' - ' + plresponse.text)
+ if plresponse.status_code==202:
+ logging.info('Job invoked. Please check the Fabric monitoring hub to review the status of the job.')
+ else:
+ logging.error('An error occurred when trying to invoke job: ' + str(plresponse.status_code) + ' - ' + plresponse.text)
+ raise ValueError("Error invoking Fabric notebook. Please review the debug logs.")
- headers = {
- "Authorization": f"Bearer {token}",
- "Content-Type": "application/json" # Set the content type based on your request
- }
- logging.info('Setting notebook parameters...')
- payload_data = '{' \
- '"executionData": {' \
- '"parameters": {' \
- '"_inlineInstallationEnabled": {"value": "True", "type": "bool"},' \
- '"source_ws": {"value": "' + SOURCE_WS + '", "type": "string"},' \
- '"copy_lakehouse_data": {"value": "' + COPY_LH + '", "type": "bool"},' \
- '"create_lakehouse_shortcuts": {"value": "' + CREATE_SC + '", "type": "bool"},' \
- '"copy_warehouse_data": {"value": "' + COPY_WH + '", "type": "bool"},' \
- '"target_ws": {"value": "' + TARGET_WS + '", "type": "string"},' \
- '"p_connections_from_to": {"value": "' + CONNECTIONS_FROM_TO + '", "type": "string"}' \
- '}}}'
- logging.info('Invoking Fabric notebook job...')
- plresponse = requests.post(plurl, json=json.loads(payload_data), headers=headers)
- logging.info(str(plresponse.status_code) + ' - ' + plresponse.text)
-else:
- logging.error("Could not aquire token")
- raise ValueError("Could not generate authentication token. Please review the debug logs.")
+ else:
+ logging.error("Could not aquire token")
+ raise ValueError("Could not generate authentication token. Please review the debug logs.")
+
+
+if __name__ == "__main__":
+ logging.info('Starting Run_post_activity script...')
+ main()
+
diff --git a/accelerators/CICD/Branch-out-to-new-workspace/Fabric/Branch out to new workspace - Post Activity.ipynb b/accelerators/CICD/Branch-out-to-new-workspace/Fabric/Branch out to new workspace - Post Activity.ipynb
index 9fdd1eb..26ea005 100644
--- a/accelerators/CICD/Branch-out-to-new-workspace/Fabric/Branch out to new workspace - Post Activity.ipynb
+++ b/accelerators/CICD/Branch-out-to-new-workspace/Fabric/Branch out to new workspace - Post Activity.ipynb
@@ -1 +1 @@
-{"cells":[{"cell_type":"markdown","source":["##### Branch out to new workspace notebook - post activity\n","\n","After cloning a workspace, this notebook will reconfigure any references to the old workspace by rebinding them to the new workspace. \n","\n","For example a pipeline referencing a warehouse or a default lakehouse of a notebook.\n","\n","This notebook runs post activity tasks can be run after [branch out to new workspace functionality](https://blog.fabric.microsoft.com/en-us/blog/introducing-new-branching-capabilities-in-fabric-git-integration) or the [custom AzDO script](https://github.com/microsoft/fabric-toolbox/blob/main/accelerators/CICD/Branch-out-to-new-workspace/AzDO/scripts/BranchOut-Feature-Workspace-Automation.py).\n","\n","Summary of post activities in order:\n","
\n","- Default lakehouses and warehouse are updated to local lakehouse/warehouses
\n","- Either creates shortcuts in local lakehouse back to tables in the source lakehouse, or copies the data from source lakehouse. Set via parameter below.
\n","- Copy warehouse data. Set via parameter below
\n","- Changes directlake semantic model connections for semantic models to \"local\" lakehouse/warehouse
\n","- Rebinds reports to \"local\" semantic models
\n","- Changes pipeline lakehouse/warehouse references to local item
\n","- Ability to swap connections in pipelines from old to new
\n","- Commit changes to git
\n","
\n","\n","Requirements:\n","\n","- Requires Semantic Link Labs installed by pip install below or added to environment library.
\n","- Requires JmesPath library for data pipeline JSON manipulation i.e. connection swaps.
\n","
\n","\n","Limitations of current script:\n","\n","\n","- Does not recreate item shares or external shortcuts
\n","- Does not re-apply lakehouse SQL Endpoint or Warehouse object/row/column level security
\n","- Does not recreate data access roles in Lakehouse
\n","- Untested with Lakehouses where with schema support enabled
\n","
\n","\n","\n"],"metadata":{"nteract":{"transient":{"deleting":false}},"microsoft":{"language":"python","language_group":"synapse_pyspark"}},"id":"a98b6d0a-7a36-4116-ab0d-aa70144eb737"},{"cell_type":"markdown","source":["##### Install semantic link labs to support advanced functionality\n","\n","https://semantic-link-labs.readthedocs.io/en/latest/index.html\n","https://github.com/microsoft/semantic-link-labs/blob/main/README.md\n","\n"],"metadata":{"nteract":{"transient":{"deleting":false}},"microsoft":{"language":"python","language_group":"synapse_pyspark"}},"id":"3b887bd6-a9c9-430f-b58f-b58a93f5ce29"},{"cell_type":"code","source":["%pip -q install semantic-link-labs\n"],"outputs":[],"execution_count":null,"metadata":{"jupyter":{"source_hidden":false,"outputs_hidden":true},"nteract":{"transient":{"deleting":false}},"microsoft":{"language":"python","language_group":"synapse_pyspark"}},"id":"1b03316d-c088-4a0e-a2f0-44d45d112121"},{"cell_type":"markdown","source":["##### Install Jmespath to make data pipeline changes such as updating linked notebooks, warehouses and lakehouses "],"metadata":{"nteract":{"transient":{"deleting":false}},"microsoft":{"language":"python","language_group":"synapse_pyspark"}},"id":"8a74ed11-dd64-43bb-a735-906a947c8666"},{"cell_type":"code","source":["%pip install jmespath"],"outputs":[],"execution_count":null,"metadata":{"jupyter":{"source_hidden":false,"outputs_hidden":false},"nteract":{"transient":{"deleting":false}},"microsoft":{"language":"python","language_group":"synapse_pyspark"}},"id":"c68be6ba-7648-457f-af82-f1987d12d7f7"},{"cell_type":"markdown","source":["##### Set parameters\n","Before running this notebook ensure these parameters are set correctly. If necessary these can be passed in via a data factory pipeline"],"metadata":{"nteract":{"transient":{"deleting":false}},"microsoft":{"language":"python","language_group":"synapse_pyspark"}},"id":"dee81614-b92b-4242-890a-b11f97b1a640"},{"cell_type":"code","source":["source_ws = ''\n","target_ws = ''\n","\n","# Either copy lakehouse data or create shortcuts, set at most one of these to True \n","copy_lakehouse_data = True\n","create_lakehouse_shortcuts = False\n","\n","# Option to copy warehouse data if required\n","copy_warehouse_data = True\n","\n","# If false then shortcuts will be created. If you wish to create shortcuts based on a pattern match please set the param below\n","# enter pattern match for creating shortcuts - see https://github.com/arasdk/fabric-code-samples/blob/main/shortcuts/fabric_shortcut_creator.py \n","PATTERN_MATCH = [\"*\"]\n","_inlineInstallationEnabled = True\n","\n","# Set connections to be replaced from previous name or ID to new name or ID.\n","p_connections_from_to = ()#('https://api.fabric.microsoft.com/v1/workspaces/ admin','4498340c-27cf-4c6e-a025-00e5de6b0726'),('4498340c-27cf-4c6e-a025-00e5de6b0726','https://api.fabric.microsoft.com/v1/workspaces/ admin'),('https://api.fabric.microsoft.com/v1/workspaces/ admin','4498340c-27cf-4c6e-a025-00e5de6b0726')"],"outputs":[],"execution_count":null,"metadata":{"jupyter":{"source_hidden":false,"outputs_hidden":false},"nteract":{"transient":{"deleting":false}},"microsoft":{"language":"python","language_group":"synapse_pyspark"},"tags":["parameters"]},"id":"90efaa4f-846d-4924-900e-258837a3467d"},{"cell_type":"markdown","source":["##### Library imports and fabric rest client setup\n","\n","https://learn.microsoft.com/en-us/python/api/semantic-link-sempy/sempy.fabric.fabricrestclient"],"metadata":{"nteract":{"transient":{"deleting":false}},"microsoft":{"language":"python","language_group":"synapse_pyspark"}},"id":"4fb01e1d-ec4e-4c69-b544-66f6d8c5a475"},{"cell_type":"code","source":["import pandas as pd\n","import datetime, time\n","import re,json, fnmatch,os\n","import requests, base64\n","import sempy\n","import sempy.fabric as fabric\n","from sempy.fabric.exceptions import FabricHTTPException, WorkspaceNotFoundException\n","from pyspark.sql import DataFrame\n","from pyspark.sql.functions import col,current_timestamp,lit\n","import sempy_labs as labs\n","from sempy_labs import migration, directlake\n","from sempy_labs import lakehouse as lake\n","from sempy_labs import report as rep\n","from sempy_labs.tom import connect_semantic_model\n","\n","# instantiate the Fabric rest client\n","client = fabric.FabricRestClient()\n","\n","# get the current workspace ID based on the context of where this notebook is run from\n","thisWsId = notebookutils.runtime.context['currentWorkspaceId']\n","thisWsName = notebookutils.runtime.context['currentWorkspaceName']\n","\n","source_ws_id = fabric.resolve_workspace_id(source_ws)\n","target_ws_id = fabric.resolve_workspace_id(target_ws)\n"],"outputs":[],"execution_count":null,"metadata":{"jupyter":{"source_hidden":false,"outputs_hidden":false},"nteract":{"transient":{"deleting":false}},"microsoft":{"language":"python","language_group":"synapse_pyspark"}},"id":"391624c1-b299-452d-9ebf-f32626d49970"},{"cell_type":"markdown","source":["##### Update default and attached lakehouses/warehouses for notebooks\n","\n","Update notebook dependencies based on but now supports T-SQL notebooks:\n","https://github.com/PowerBiDevCamp/FabConWorkshopSweden/blob/main/DemoFiles/GitUpdateWorkspace/updateWorkspaceDependencies_v1.ipynb\n"],"metadata":{"nteract":{"transient":{"deleting":false}},"microsoft":{"language":"python","language_group":"synapse_pyspark"}},"id":"aaae8a08-588d-4dd8-9d2c-2200b7a88d30"},{"cell_type":"code","source":["for notebook in notebookutils.notebook.list(workspaceId=target_ws_id):\n"," updates = False\n"," if notebook.displayName == 'ETL':#True: #notebook.displayName == 'T-SQL_Notebook': #notebook.displayName != 'Create Feature Branch':\n","\n"," # Get the current notebook definition\n"," json_payload = json.loads(notebookutils.notebook.getDefinition(notebook.displayName,workspaceId=source_ws_id))\n"," #print(json.dumps(json_payload, indent=4))\n"," # Check for any attached lakehouses\n"," if 'dependencies' in json_payload['metadata'] \\\n"," and 'lakehouse' in json_payload['metadata']['dependencies'] \\\n"," and json_payload['metadata'][\"dependencies\"][\"lakehouse\"] is not None:\n"," # Extract attached and default lakehouses\n"," current_lakehouse = json_payload['metadata']['dependencies']['lakehouse']\n"," # if default lakehouse setting exists\n"," if 'default_lakehouse_name' in current_lakehouse:\n"," print(f\"Updating notebook {notebook.displayName} with new default lakehouse: {current_lakehouse['default_lakehouse_name']} in workspace {target_ws}\")\n"," source_lh_name = fabric.resolve_item_name(item_id = current_lakehouse['default_lakehouse'],type='Lakehouse',workspace=source_ws_id)\n"," current_lakehouse['default_lakehouse'] = fabric.resolve_item_id(item_name = source_lh_name,type='Lakehouse',workspace=target_ws_id)\n"," current_lakehouse['default_lakehouse_workspace_id'] = target_ws_id\n"," updates = True\n"," # loop through all attached lakehouess\n"," for lakehouse in json_payload['metadata']['dependencies']['lakehouse']['known_lakehouses']:\n"," source_lh_id = lakehouse['id']\n"," # find source lakehouse name\n"," source_lh_name = fabric.resolve_item_name(item_id = lakehouse['id'],type='Lakehouse',workspace=source_ws_id)\n"," # find target lakehouse id based on name\n"," target_lh_id = fabric.resolve_item_id(item_name = source_lh_name,type='Lakehouse',workspace=target_ws_id)\n"," lakehouse['id'] = target_lh_id\n"," print(f'Updating attached lakehouse {source_lh_name} from {source_lh_id} to target ID {target_lh_id}')\n"," updates = True\n","\n"," if 'dependencies' in json_payload['metadata'] and 'warehouse' in json_payload['metadata']['dependencies']:\n"," # Fetch existing details\n"," current_warehouse = json_payload['metadata']['dependencies']['warehouse']\n"," current_warehouse_id = current_warehouse['default_warehouse']\n"," source_wh_name = fabric.resolve_item_name(item_id = current_warehouse_id,workspace=source_ws_id)\n"," #print('Source warehouse name is ' + source_wh_name)\n"," target_wh_id = fabric.resolve_item_id(item_name = source_wh_name,type='Warehouse',workspace=target_ws_id)\n","\n"," if 'default_warehouse' in current_warehouse:\n"," #json_payload['metadata']['dependencies']['warehouse'] = {}\n"," print(f\"Attempting to update notebook {notebook.displayName} with new default warehouse: {target_wh_id} in {target_ws}\")\n"," \n"," json_payload['metadata']['dependencies']['warehouse']['default_warehouse'] = target_wh_id\n"," for warehouse in json_payload['metadata']['dependencies']['warehouse']['known_warehouses']:\n"," if warehouse['id'] == current_warehouse_id:\n"," warehouse['id'] = target_wh_id\n"," updates = True\n","\n"," if updates:\n"," notebookutils.notebook.updateDefinition(\n"," name = notebook.displayName,\n"," content = json.dumps(json_payload),\n"," workspaceId = target_ws_id\n"," )\n"," \n"," print(f\"Updated notebook {notebook.displayName} in {target_ws}\")\n","\n"," else:\n"," print(f'No default lakehouse set for notebook {notebook.displayName}, ignoring.')"],"outputs":[],"execution_count":null,"metadata":{"jupyter":{"source_hidden":false,"outputs_hidden":false},"nteract":{"transient":{"deleting":false}},"microsoft":{"language":"python","language_group":"synapse_pyspark"}},"id":"5c60b5d2-f83c-46f8-9870-9fd609166b67"},{"cell_type":"markdown","source":["##### Run the below cell - contains utility functions to support lakehouse and warehouse initialisation\n","\n","Shortcut creator:\n","https://github.com/arasdk/fabric-code-samples/blob/main/shortcuts/fabric_shortcut_creator.py "],"metadata":{"nteract":{"transient":{"deleting":false}},"microsoft":{"language":"python","language_group":"synapse_pyspark"}},"id":"a47a56df-219d-4d6c-b950-491909638deb"},{"cell_type":"code","source":["##### \n","### Shortcut utility function \n","####\n","\n","# Extract workspace_id, item_id and path from a onelake URI\n","def extract_onelake_https_uri_components(uri):\n"," # Define a regular expression to match any string between slashes and capture the final path element(s) without the leading slash\n"," pattern = re.compile(r\"abfss://([^@]+)@[^/]+/([^/]+)/(.*)\")\n"," match = pattern.search(uri)\n"," if match:\n"," workspace_id, item_id, path = match.groups()\n"," return workspace_id, item_id, path\n"," else:\n"," return None, None, None\n","\n","\n","def is_valid_onelake_uri(uri: str) -> bool:\n"," workspace_id, item_id, path = extract_onelake_https_uri_components(uri)\n"," if \"abfss://\" not in uri or workspace_id is None or item_id is None or path is None:\n"," return False\n","\n"," return True\n","\n","\n","def get_last_path_segment(uri: str):\n"," path = uri.split(\"/\") # Split the entire URI by '/'\n"," return path[-1] if path else None\n","\n","\n","def is_delta_table(uri: str):\n"," delta_log_path = os.path.join(uri, \"_delta_log\")\n"," return mssparkutils.fs.exists(delta_log_path)\n","\n","\n","def get_onelake_shorcut(workspace_id: str, item_id: str, path: str, name: str):\n"," shortcut_uri = (\n"," f\"v1/workspaces/{workspace_id}/items/{item_id}/shortcuts/{path}/{name}\"\n"," )\n"," result = client.get(shortcut_uri).json()\n"," return result\n","\n","\n","def is_folder_matching_pattern(path: str, folder_name: str, patterns: []):\n"," if folder_name in patterns:\n"," return True\n"," else:\n"," for pattern in patterns:\n"," if fnmatch.fnmatch(folder_name, pattern):\n"," return is_delta_table(path)\n","\n"," return False\n","\n","\n","def get_matching_delta_tables_uris(uri: str, patterns: []) -> []:\n"," # Use a set to avoid duplicates\n"," matched_uris = set()\n"," files = mssparkutils.fs.ls(uri)\n"," folders = [item for item in files if item.isDir]\n","\n"," # Filter folders to only those that matches the pattern and is a delta table\n"," matched_uris.update(\n"," folder.path\n"," for folder in folders\n"," if is_folder_matching_pattern(folder.path, folder.name, patterns)\n"," )\n","\n"," return matched_uris\n","\n","\n","def create_onelake_shorcut(source_uri: str, dest_uri: str):\n"," src_workspace_id, src_item_id, src_path = extract_onelake_https_uri_components(\n"," source_uri\n"," )\n","\n"," dest_workspace_id, dest_item_id, dest_path = extract_onelake_https_uri_components(\n"," dest_uri\n"," )\n","\n"," name = get_last_path_segment(source_uri)\n"," dest_uri_joined = os.path.join(dest_uri, name)\n","\n"," # If the destination path already exists, return without creating shortcut\n"," if mssparkutils.fs.exists(dest_uri_joined):\n"," print(f\"Destination already exists: {dest_uri_joined}\")\n"," return None\n","\n"," request_body = {\n"," \"name\": name,\n"," \"path\": dest_path,\n"," \"target\": {\n"," \"oneLake\": {\n"," \"itemId\": src_item_id,\n"," \"path\": src_path,\n"," \"workspaceId\": src_workspace_id,\n"," }\n"," },\n"," }\n","\n"," shortcut_uri = f\"v1/workspaces/{dest_workspace_id}/items/{dest_item_id}/shortcuts\"\n"," print(f\"Creating shortcut: {shortcut_uri}/{name}..\")\n"," try:\n"," client.post(shortcut_uri, json=request_body)\n"," except FabricHTTPException as e:\n"," print(e)\n"," return None\n","\n"," return get_onelake_shorcut(dest_workspace_id, dest_item_id, dest_path, name)\n"," \n","\n","####\n","## Copy lakehouse and warehouse utility functions\n","####\n","\n","def get_lh_object_list(base_path,data_types = ['Tables', 'Files'])->pd.DataFrame:\n","\n"," '''\n"," Function to get a list of tables for a lakehouse\n"," adapted from https://fabric.guru/getting-a-list-of-folders-and-delta-tables-in-the-fabric-lakehouse\n"," This function will return a pandas dataframe containing names and abfss paths of each folder for Files and Tables\n"," '''\n"," #data_types = ['Tables', 'Files'] #for if you want a list of files and tables\n"," #data_types = ['Tables'] #for if you want a list of tables\n","\n"," df = pd.concat([\n"," pd.DataFrame({\n"," 'name': [item.name for item in notebookutils.fs.ls(f'{base_path}/{data_type}/')],\n"," 'type': data_type[:-1].lower() , \n"," 'src_path': [item.path for item in notebookutils.fs.ls(f'{base_path}/{data_type}/')],\n"," }) for data_type in data_types], ignore_index=True)\n","\n"," return df\n","\n","def get_wh_object_list(schema_list,base_path)->pd.DataFrame:\n","\n"," '''\n"," Function to get a list of tables for a warehouse by schema\n"," '''\n"," data_type = 'Tables'\n"," dfs = []\n","\n"," for schema_prefix in schema_list:\n"," if notebookutils.fs.exists(f'{base_path}/{data_type}/{schema_prefix}/'):\n"," items = notebookutils.fs.ls(f'{base_path}/{data_type}/{schema_prefix}/')\n"," if items: # Check if the list is not empty\n"," df = pd.DataFrame({\n"," 'schema': schema_prefix,\n"," 'name': [item.name for item in items],\n"," 'type': data_type[:-1].lower(),\n"," 'src_path': [item.path for item in items],\n"," })\n"," dfs.append(df)\n","\n"," if dfs: # Check if the list of dataframes is not empty\n"," df = pd.concat(dfs, ignore_index=True)\n"," else:\n"," df = pd.DataFrame() # Return an empty dataframe if no dataframes were created\n","\n"," return df\n","\n","def copy_lh_objects(table_list,workspace_src,workspace_tgt,lakehouse_src,lakehouse_tgt,fastcopy=True,usingIDs=False)->pd.DataFrame:\n"," # declare an array to keep the instrumentation\n"," cpresult = []\n"," # loop through all the tables to extract the source path \n"," for table in table_list.src_path:\n"," source = table\n"," destination = source.replace(f'abfss://{workspace_src}', f'abfss://{workspace_tgt}')\n"," if usingIDs:\n"," destination = destination.replace(f'{lakehouse_src}', f'{lakehouse_tgt}')\n"," else:\n"," destination = destination.replace(f'{lakehouse_src}.Lakehouse', f'{lakehouse_tgt}.Lakehouse')\n"," start_time = datetime.datetime.now()\n"," if notebookutils.fs.exists(destination):\n"," notebookutils.fs.rm(destination, True)\n"," if fastcopy:\n"," # use fastcopy util which is a python wrapper to azcopy\n"," notebookutils.fs.fastcp(source+'/*', destination+'/', True)\n"," else:\n"," notebookutils.fs.cp(source, destination, True)\n","\n"," # recording the timing and add it to the results list\n"," end_time = datetime.datetime.now()\n"," copyreslist = [source, destination, start_time.strftime(\"%Y-%m-%d %H:%M:%S\"), end_time.strftime(\"%Y-%m-%d %H:%M:%S\"), str((end_time - start_time).total_seconds())]\n"," cpresult.append(copyreslist)\n"," return pd.DataFrame(cpresult,columns =['source--------------------------------------','target--------------------------------------','start------------','end_time------------','elapsed seconds----'])\n","\n","def createDWrecoverypl(ws_id,pl_name = 'Recover_Warehouse_Data_From_DR'):\n"," client = fabric.FabricRestClient()\n","\n"," dfurl= \"v1/workspaces/\"+ ws_id + \"/items\"\n"," payload = { \n"," \"displayName\": pl_name, \n"," \"type\": \"DataPipeline\", \n"," \"definition\": { \n"," \"parts\": [ \n"," { \n"," \"path\": \"pipeline-content.json\", \n"," \"payload\": \"{
    "properties": {
        "activities": [
            {
                "name": "IterateSchemaTables",
                "type": "ForEach",
                "dependsOn": [],
                "typeProperties": {
                    "items": {
                        "value": "@pipeline().parameters.tablesToCopy",
                        "type": "Expression"
                    },
                    "batchCount": 20,
                    "activities": [
                        {
                            "name": "CopyWarehouseTables",
                            "type": "Copy",
                            "dependsOn": [
                                {
                                    "activity": "Set table",
                                    "dependencyConditions": [
                                        "Succeeded"
                                    ]
                                }
                            ],
                            "policy": {
                                "timeout": "0.12:00:00",
                                "retry": 2,
                                "retryIntervalInSeconds": 30,
                                "secureOutput": false,
                                "secureInput": false
                            },
                            "typeProperties": {
                                "source": {
                                    "type": "DataWarehouseSource",
                                    "queryTimeout": "02:00:00",
                                    "partitionOption": "None",
                                    "datasetSettings": {
                                        "annotations": [],
                                        "linkedService": {
                                            "name": "07a03006_d1b6_4a39_beb1_0bba2aaf5ff7",
                                            "properties": {
                                                "annotations": [],
                                                "type": "DataWarehouse",
                                                "typeProperties": {
                                                    "endpoint": "@pipeline().parameters.lakehouseConnStr",
                                                    "artifactId": "@pipeline().parameters.lakehouseId",
                                                    "workspaceId": "@pipeline().parameters.workspaceId"
                                                }
                                            }
                                        },
                                        "type": "DataWarehouseTable",
                                        "schema": [],
                                        "typeProperties": {
                                            "schema": "dbo",
                                            "table": {
                                                "value": "@concat(concat(item().schema,'_'),item().name)",
                                                "type": "Expression"
                                            }
                                        }
                                    }
                                },
                                "sink": {
                                    "type": "DataWarehouseSink",
                                    "allowCopyCommand": true,
                                    "tableOption": "autoCreate",
                                    "datasetSettings": {
                                        "annotations": [],
                                        "linkedService": {
                                            "name": "0c03123a_d312_46c4_a8e7_5b4cad8f12d7",
                                            "properties": {
                                                "annotations": [],
                                                "type": "DataWarehouse",
                                                "typeProperties": {
                                                    "endpoint": "@pipeline().parameters.warehouseConnStr",
                                                    "artifactId": "@pipeline().parameters.warehouseId",
                                                    "workspaceId": "@pipeline().parameters.workspaceId"
                                                }
                                            }
                                        },
                                        "type": "DataWarehouseTable",
                                        "schema": [],
                                        "typeProperties": {
                                            "schema": "dbo",
                                            "table": {
                                                "value": "@item().name",
                                                "type": "Expression"
                                            }
                                        }
                                    }
                                },
                                "enableStaging": true,
                                "translator": {
                                    "type": "TabularTranslator",
                                    "typeConversion": true,
                                    "typeConversionSettings": {
                                        "allowDataTruncation": true,
                                        "treatBooleanAsNumber": false
                                    }
                                }
                            }
                        },
                        {
                            "name": "Set table",
                            "type": "SetVariable",
                            "dependsOn": [
                                {
                                    "activity": "Set schema",
                                    "dependencyConditions": [
                                        "Succeeded"
                                    ]
                                }
                            ],
                            "policy": {
                                "secureOutput": false,
                                "secureInput": false
                            },
                            "typeProperties": {
                                "variableName": "Tablename",
                                "value": {
                                    "value": "@item().name",
                                    "type": "Expression"
                                }
                            }
                        },
                        {
                            "name": "Set schema",
                            "type": "SetVariable",
                            "dependsOn": [],
                            "policy": {
                                "secureOutput": false,
                                "secureInput": false
                            },
                            "typeProperties": {
                                "variableName": "Schemaname",
                                "value": {
                                    "value": "@item().schema",
                                    "type": "Expression"
                                }
                            }
                        }
                    ]
                }
            }
        ],
        "parameters": {
            "lakehouseId": {
                "type": "string",
                "defaultValue": "0f0f6b7c-1761-41e6-896e-30014f16ff6d"
            },
            "tablesToCopy": {
                "type": "array",
                "defaultValue": [
                    {
                        "schema": "dbo",
                        "name": "Date"
                    },
                    {
                        "schema": "dbo",
                        "name": "Geography"
                    },
                    {
                        "schema": "dbo",
                        "name": "HackneyLicense"
                    },
                    {
                        "schema": "dbo",
                        "name": "Medallion"
                    },
                    {
                        "schema": "dbo",
                        "name": "Time"
                    },
                    {
                        "schema": "dbo",
                        "name": "Trip"
                    },
                    {
                        "schema": "dbo",
                        "name": "Weather"
                    }
                ]
            },
            "workspaceId": {
                "type": "string",
                "defaultValue": "1501143c-272f-4a2f-976a-7e55971e4c2b"
            },
            "warehouseId": {
                "type": "string",
                "defaultValue": "4d1bd951-99de-4bd7-b7bc-71c8f56db411"
            },
            "warehouseConnStr": {
                "type": "string",
                "defaultValue": "72wwbivi2ubejbrtmtaho32b4y-hqkacfjpe4xuvf3kpzkzohsmfm.datawarehouse.fabric.microsoft.com"
            },
            "lakehouseConnStr": {
                "type": "string",
                "defaultValue": "72wwbivi2ubejbrtmtaho32b4y-hqkacfjpe4xuvf3kpzkzohsmfm.datawarehouse.fabric.microsoft.com"
            }
        },
        "variables": {
            "Tablename": {
                "type": "String"
            },
            "Schemaname": {
                "type": "String"
            }
        },
        "lastModifiedByObjectId": "4aa20af7-94bd-4348-bef8-f8cbcd840d51",
        "lastPublishTime": "2024-11-13T15:52:52Z"
    }
}\", \n"," \"payloadType\": \"InlineBase64\" \n"," } \n"," ] \n"," } \n","} \n"," \n"," response = json.loads(client.post(dfurl,json= payload).content)\n"," return response['id']\n","\n","def getItemId(wks_id,itm_name,itm_type):\n"," df = fabric.list_items(type=None,workspace=wks_id)\n"," #print(df)\n"," if df.empty:\n"," return 'NotExists'\n"," else:\n"," #display(df)\n"," #print(df.query('\"Display Name\"=\"'+itm_name+'\"'))\n"," if itm_type != '':\n"," newdf= df.loc[(df['Display Name'] == itm_name) & (df['Type'] == itm_type)]['Id']\n"," else:\n"," newdf= df.loc[(df['Display Name'] == itm_name)]['Id'] \n"," if newdf.empty:\n"," return 'NotExists'\n"," else:\n"," return newdf.iloc[0]\n"],"outputs":[],"execution_count":null,"metadata":{"jupyter":{"source_hidden":true,"outputs_hidden":false},"nteract":{"transient":{"deleting":false}},"microsoft":{"language":"python","language_group":"synapse_pyspark"},"collapsed":false},"id":"e46210e9-58c9-483a-84ae-bbdc2ad1c37f"},{"cell_type":"markdown","source":["##### Either create shortcuts from source to target lakehouse(s) or copy data\n","\n","Loops through lakehouse(s) in the target workspace and either populates them with shortcuts or data\n","\n"],"metadata":{"nteract":{"transient":{"deleting":false}},"microsoft":{"language":"python","language_group":"synapse_pyspark"}},"id":"a15065f3-670d-4bc9-b337-51709f6cdb1f"},{"cell_type":"code","source":["df_lhs = labs.list_lakehouses(source_ws)\n","for index, row in df_lhs.iterrows():\n","\n","\n"," if copy_lakehouse_data:\n"," df_lakehouses = (labs.list_lakehouses(source_ws))\n"," lh_name= row['Lakehouse Name']\n"," if lh_name.find('temp')==-1:\n"," # Gathers the list of recovers tables and source paths to be copied into the lakehouse associated with this notebook \n"," src_path = f'abfss://{source_ws}@onelake.dfs.fabric.microsoft.com/{lh_name}.Lakehouse'\n","\n"," table_list = get_lh_object_list(src_path)\n"," print(f'Attempting to copy table data for lakehouse {lh_name} from workspace {source_ws} to {target_ws}...')\n"," display(table_list)\n","\n"," #print('Copy Lakehouse Delta tables...')\n"," res = copy_lh_objects(table_list[table_list['type']=='table'],source_ws,target_ws,\n"," lh_name,lh_name,False,False)\n"," display(res)\n"," # Copy files\n"," print(f'Attempting to copy file data for lakehouse {lh_name} from workspace {source_ws} to {target_ws}...')\n","\n"," #print('Copy Lakehouse files...')\n"," res = copy_lh_objects(table_list[table_list['type']=='file'],source_ws,target_ws,\n"," lh_name,lh_name,False,False)\n"," display(res)\n"," print('Done.')\n","\n"," else:\n"," # fetch ID of source lakehouse based on name and workspace\n"," source_lh_id = fabric.resolve_item_id(\n"," item_name=row['Lakehouse Name'], type=\"Lakehouse\", workspace=source_ws\n"," )\n"," #target_lh_id = notebookutils.lakehouse.getWithProperties(name=current_lakehouse['default_lakehouse_name'], workspaceId=new_workspace_id)['id']\n","\n"," SOURCE_URI = f\"abfss://{source_ws_id}@onelake.dfs.fabric.microsoft.com/{source_lh_id}/Tables\"\n"," DEST_URI = f\"abfss://{target_ws_id}@onelake.dfs.fabric.microsoft.com/{row['Lakehouse ID']}/Tables\"\n","\n"," if PATTERN_MATCH is None or len(PATTERN_MATCH) == 0:\n"," raise TypeError(\"Argument 'PATTERN_MATCH' should be a valid list of patterns or [\"*\"] to match everything\")\n","\n"," # Collect created shortcuts\n"," result = []\n","\n"," # If either URI's are invalid, just return\n"," if not is_valid_onelake_uri(SOURCE_URI) or not is_valid_onelake_uri(DEST_URI):\n"," print(\n"," \"invalid URI's provided. URI's should be in the form: abfss://@onelake.dfs.fabric.microsoft.com//\"\n"," )\n"," else:\n"," # Remove any trailing '/' from uri's\n"," source_uri_addr = SOURCE_URI.rstrip(\"/\")\n"," dest_uri_addr = DEST_URI.rstrip(\"/\")\n","\n"," dest_workspace_id, dest_item_id, dest_path = extract_onelake_https_uri_components(\n"," dest_uri_addr\n"," )\n","\n"," # If we are not shortcutting to a managed table folder or\n"," # the source uri is a delta table, just shortcut it 1-1.\n"," if not dest_path.startswith(\"Tables\") or is_delta_table(source_uri_addr):\n"," shortcut = create_onelake_shorcut(source_uri_addr, dest_uri_addr)\n"," if shortcut is not None:\n"," result.append(shortcut)\n"," else:\n"," # If source is not a delta table, and destination is managed table folder:\n"," # Iterate over source folders and create table shortcuts @ destination\n"," for delta_table_uri in get_matching_delta_tables_uris(\n"," source_uri_addr, PATTERN_MATCH\n"," ):\n"," shortcut = create_onelake_shorcut(delta_table_uri, dest_uri_addr)\n"," if shortcut is not None:\n"," result.append(shortcut)\n"," print(result)\n"],"outputs":[],"execution_count":null,"metadata":{"jupyter":{"source_hidden":false,"outputs_hidden":false},"nteract":{"transient":{"deleting":false}},"microsoft":{"language":"python","language_group":"synapse_pyspark"},"collapsed":false},"id":"8bec3fbc-4a75-4ba3-86d5-0620ec504a8f"},{"cell_type":"markdown","source":["##### Copy warehouse data via parameterised pipeline\n","\n","Loop through all warehouses and copy the data"],"metadata":{"nteract":{"transient":{"deleting":false}},"microsoft":{"language":"python","language_group":"synapse_pyspark"}},"id":"f198b816-77e9-4f04-9139-d78237bedc72"},{"cell_type":"code","source":["p_logging_verbose = True\n","df_warehouses = (labs.list_warehouses(target_ws))\n","display(df_warehouses)\n","for index, row in df_warehouses.iterrows():\n"," source_wh_id = labs.resolve_warehouse_id(row['Warehouse Name'],source_ws_id)\n"," target_wh_id = labs.resolve_warehouse_id(row['Warehouse Name'],target_ws_id)\n"," \n"," src_path = f'abfss://'+source_ws_id+'@onelake.dfs.fabric.microsoft.com/'+source_wh_id\n"," tgt_path = f'abfss://'+target_ws_id+'@onelake.dfs.fabric.microsoft.com/'+target_wh_id\n","\n"," # extract the list of schemas per data \n"," schema_list = get_lh_object_list(src_path,['Tables'])\n"," # extract a list of warehouse objects per schema and store in a list\n"," table_list = get_wh_object_list(schema_list['name'],src_path)\n"," \n"," # create a temporary staging lakehouse per warehouse to create shortcuts into, \n"," # which point back to original warehouse data currently in the DR storage account\n"," lhname = 'temp_rlh_' + source_ws+'_'+row['Warehouse Name']\n"," # check if it exists before attempting create\n"," if p_logging_verbose:\n"," print('Checking whether the temporary lakehouse \"'+ lhname +'\" exists in workspace '+target_ws+'...')\n"," temp_lh_id = getItemId(target_ws_id,lhname,'Lakehouse')\n"," if temp_lh_id == 'NotExists':\n"," lhname = lhname[:256] # lakehouse name should not exceed 256 characters\n"," payload = payload = '{\"displayName\": \"' + lhname + '\",' \\\n"," + '\"description\": \"Interim staging lakehouse for primary warehouse recovery: ' \\\n"," + source_ws+'_'+row['Warehouse Name'] + 'into workspace '+ target_ws + '(' + target_ws +')\"}'\n"," try:\n"," lhurl = \"v1/workspaces/\" + target_ws_id + \"/lakehouses\"\n"," lhresponse = client.post(lhurl,json= json.loads(payload))\n"," temp_lh_id = lhresponse.json()['id']\n"," if p_logging_verbose:\n"," print('Temporary lakehouse \"'+ lhname +'\" created with Id ' + temp_lh_id + ': ' + str(lhresponse.status_code) + ' ' + str(lhresponse.text))\n"," except Exception as error:\n"," print(error.errorCode)\n"," else:\n"," if p_logging_verbose:\n"," print('Temporary lakehouse '+lhname+' (' + temp_lh_id + ') already exists.')\n"," \n"," time.sleep(60) # waiting for temporary lakehouse to provision completely \n","\n"," # Create shortcuts for every table in the format of schema_table under the tables folder\n"," for index,itable in table_list.iterrows():\n"," shortcutExists=False\n"," # Check if shortcut exists\n"," try:\n"," url = \"v1/workspaces/\" + target_ws_id + \"/items/\" + temp_lh_id + \"/shortcuts/Tables/\"+itable['schema']+'_'+itable['name']\n"," tlhresponse = client.get(url)\n"," shortcutExists = True\n"," if p_logging_verbose:\n"," print('Shortcut '+itable['schema']+'_'+itable['name'] +' already exists')\n"," except Exception as error:\n"," shortcutExists = False \n","\n"," if not shortcutExists: \n"," # Create shortcuts - one per table per schema\n"," url = \"v1/workspaces/\" + target_ws_id + \"/items/\" + temp_lh_id + \"/shortcuts\"\n"," scpayload = '{' \\\n"," '\"path\": \"Tables/\",' \\\n"," '\"name\": \"'+itable['schema']+'_'+itable['name']+'\",' \\\n"," '\"target\": {' \\\n"," '\"oneLake\": {' \\\n"," '\"workspaceId\": \"' + source_ws_id + '\",' \\\n"," '\"itemId\": \"'+ source_wh_id +'\",' \\\n"," '\"path\": \"/Tables/' + itable['schema']+'/'+itable['name'] + '\"' \\\n"," '}}}' \n"," try:\n"," #print(scpayload) \n"," shctresponse = client.post(url,json= json.loads(scpayload))\n"," if p_logging_verbose:\n"," print('Shortcut '+itable['schema']+'_'+itable['name'] + ' created.' )\n","\n"," except Exception as error:\n"," print('Error creating shortcut '+itable['schema']+'_'+itable['name']+' due to '+str(error) + ':' + shctresponse.text)\n"," \n"," recovery_pipeline_prefix= 'plRecover_WH' \n"," # recovery pipeline name should not exceed 256 characters\n"," recovery_pipeline = recovery_pipeline_prefix+'_'+source_ws + '_'+row['Warehouse Name'][:256]\n"," if p_logging_verbose:\n"," print('Attempting to deploy a copy pipeline in the target workspace to load the target warehouse tables from the shortcuts created above... ')\n"," # Create the pipeline in the target workspace that loads the target warehouse from shortcuts created above \n"," plid = getItemId( target_ws_id,recovery_pipeline,'DataPipeline')\n"," #print(plid)\n"," if plid == 'NotExists':\n"," plid = createDWrecoverypl(target_ws_id,recovery_pipeline_prefix+'_'+source_ws + '_'+row['Warehouse Name'])\n"," if p_logging_verbose:\n"," print('Recovery pipeline ' + recovery_pipeline + ' created with Id '+plid)\n"," else:\n"," if p_logging_verbose:\n"," print('Datawarehouse recovery pipeline \"' + recovery_pipeline + '\" ('+plid+') already exist in workspace \"'+target_ws + '\" ('+target_ws_id+')') \n"," print('\\n')\n","\n"," tablesToCopyParam = table_list[['schema','name']].to_json( orient='records')\n"," # ensure the temporary lakehouse exists\n","\n"," # obtain the connection string for the lakehouse to pass to the copy pipeline\n"," whurl = \"v1/workspaces/\" + target_ws_id + \"/lakehouses/\" + temp_lh_id\n"," whresponse = client.get(whurl)\n"," lhconnStr = whresponse.json()['properties']['sqlEndpointProperties']['connectionString']\n","\n"," # get the SQLEndpoint ID of the lakehouse to pass to the copy pipeline\n"," items = fabric.list_items(workspace=target_ws_id)\n"," print(items)\n"," temp_lh_sqle_id = items[(items['Type'] == 'SQLEndpoint') & (items['Display Name']==lhname)]['Id'].values[0]\n","\n","\n"," # obtain the connection string for the warehouse to pass to the copy pipeline \n"," whurl = \"v1/workspaces/\" + target_ws_id + \"/warehouses/\" + target_wh_id\n"," whresponse = client.get(whurl)\n"," whconnStr = whresponse.json()['properties']['connectionInfo']\n","\n"," # obtain the pipeline id created to recover this warehouse\n"," plid = getItemId( target_ws_id,recovery_pipeline,'DataPipeline')\n"," if plid == 'NotExists':\n"," print('Error: Could not execute pipeline '+recovery_pipeline+ ' as the ID could not be obtained ')\n"," else:\n"," # pipeline url including pipeline Id unique to each warehouse\n"," plurl = 'v1/workspaces/'+target_ws_id+'/items/'+plid+'/jobs/instances?jobType=Pipeline'\n"," #print(plurl)\n","\n"," payload_data = '{' \\\n"," '\"executionData\": {' \\\n"," '\"parameters\": {' \\\n"," '\"lakehouseId\": \"' + temp_lh_sqle_id + '\",' \\\n"," '\"tablesToCopy\": ' + tablesToCopyParam + ',' \\\n"," '\"workspaceId\": \"' + target_ws_id +'\",' \\\n"," '\"warehouseId\": \"' + target_wh_id + '\",' \\\n"," '\"lakehouseConnStr\": \"' + lhconnStr + '\",' \\\n"," '\"warehouseConnStr\": \"' + whconnStr + '\"' \\\n"," '}}}'\n"," #print(payload_data)\n"," plresponse = client.post(plurl, json=json.loads(payload_data))\n"," if p_logging_verbose:\n"," print(str(plresponse.status_code)) \n","print('Done')\n"],"outputs":[],"execution_count":null,"metadata":{"jupyter":{"source_hidden":false,"outputs_hidden":false},"nteract":{"transient":{"deleting":false}},"microsoft":{"language":"python","language_group":"synapse_pyspark"},"collapsed":false},"id":"57dafef7-17a2-475f-9e62-eecc6660440c"},{"cell_type":"markdown","source":["##### Update directlake model lakehouse/warehouse connection\n","\n","https://semantic-link-labs.readthedocs.io/en/stable/sempy_labs.directlake.html#sempy_labs.directlake.update_direct_lake_model_connection "],"metadata":{"nteract":{"transient":{"deleting":false}},"microsoft":{"language":"python","language_group":"synapse_pyspark"}},"id":"cc97be77-116e-4cde-bdc6-2971ab98a083"},{"cell_type":"code","source":["\n","df_datasets = fabric.list_datasets(target_ws)\n","\n","# Iterate over each dataset in the dataframe\n","for index, row in df_datasets.iterrows():\n"," # Check if the dataset is not the default semantic model\n"," if not labs.is_default_semantic_model(row['Dataset Name'], fabric.resolve_workspace_id(target_ws)):\n"," print('Updating semantic model connection ' + row['Dataset Name'] + ' in workspace '+ target_ws)\n"," labs.directlake.update_direct_lake_model_connection(dataset=row['Dataset Name'], \n"," workspace= target_ws,\n"," source=labs.directlake.get_direct_lake_source(row['Dataset Name'], workspace= target_ws)[1], \n"," source_type=labs.directlake.get_direct_lake_source(row['Dataset Name'], workspace= target_ws)[0], \n"," source_workspace=target_ws)\n"," labs.refresh_semantic_model(dataset=row['Dataset Name'], workspace= target_ws)\n","\n"],"outputs":[],"execution_count":null,"metadata":{"jupyter":{"source_hidden":false,"outputs_hidden":false},"nteract":{"transient":{"deleting":false}},"microsoft":{"language":"python","language_group":"synapse_pyspark"}},"id":"9deccda6-5c3d-4b88-8ed8-68855ca0949a"},{"cell_type":"markdown","source":["##### Rebind reports to local datasets\n","\n","https://semantic-link-labs.readthedocs.io/en/latest/sempy_labs.report.html#sempy_labs.report.report_rebind"],"metadata":{"nteract":{"transient":{"deleting":false}},"microsoft":{"language":"python","language_group":"synapse_pyspark"}},"id":"36783f3b-4904-4d74-842d-dbd026a3184a"},{"cell_type":"code","source":["df_reports = fabric.list_reports(workspace=target_ws)\n","for index, row in df_reports.iterrows():\n"," #print(row['Name'] + '-' + row['Dataset Id'])\n"," df_datasets = fabric.list_datasets(workspace=target_ws)\n"," dataset_name = df_datasets[df_datasets['Dataset ID'] == row['Dataset Id']]['Dataset Name'].values[0]\n"," print(f'Rebinding report to {dataset_name} in {target_ws}')\n"," labs.report.report_rebind(report=row['Name'],dataset=dataset_name, report_workspace=target_ws, dataset_workspace=target_ws)\n"],"outputs":[],"execution_count":null,"metadata":{"jupyter":{"source_hidden":false,"outputs_hidden":false},"nteract":{"transient":{"deleting":false}},"microsoft":{"language":"python","language_group":"synapse_pyspark"},"collapsed":false},"id":"06268ede-b795-493e-9a8d-772654ce7e20"},{"cell_type":"markdown","source":["##### Update data pipeline source & sink connections\n","\n","Support changes lakehouses, warehouses, notebooks and connections from source to target.
\n","Connections changes should be expressed as an array of tuples [{from_1:to_1},{from_N:to_N}]"],"metadata":{"nteract":{"transient":{"deleting":false}},"microsoft":{"language":"python","language_group":"synapse_pyspark"}},"id":"4ae65012-350c-40c0-a68a-4069c567a85f"},{"cell_type":"code","source":["from typing import Optional\n","from sempy_labs._helper_functions import (\n"," resolve_workspace_name_and_id,\n"," lro,\n"," _decode_b64,\n",")\n","import sempy_labs._icons as icons\n","\n","import base64\n","from typing import Optional, Tuple, List\n","from uuid import UUID\n","\n","\n","def update_data_pipeline_definition(\n"," name: str, pipeline_content: dict, workspace: Optional[str] = None\n","):\n"," \"\"\"\n"," Updates an existing data pipeline with a new definition.\n","\n"," Parameters\n"," ----------\n"," name : str\n"," The name of the data pipeline.\n"," pipeline_content : dict\n"," The data pipeline content (not in Base64 format).\n"," workspace : str, default=None\n"," The name of the workspace.\n"," Defaults to None which resolves to the workspace of the attached lakehouse\n"," or if no lakehouse attached, resolves to the workspace of the notebook.\n"," \"\"\"\n","\n"," (workspace, workspace_id) = resolve_workspace_name_and_id(workspace)\n"," client = fabric.FabricRestClient()\n"," pipeline_payload = base64.b64encode(json.dumps(pipeline_content).encode('utf-8')).decode('utf-8')\n"," pipeline_id = fabric.resolve_item_id(\n"," item_name=name, type=\"DataPipeline\", workspace=workspace\n"," )\n","\n"," request_body = {\n"," \"definition\": {\n"," \"parts\": [\n"," {\n"," \"path\": \"pipeline-content.json\",\n"," \"payload\": pipeline_payload,\n"," \"payloadType\": \"InlineBase64\"\n"," }\n"," ]\n"," }\n"," }\n","\n","\n"," response = client.post(\n"," f\"v1/workspaces/{workspace_id}/items/{pipeline_id}/updateDefinition\",\n"," json=request_body,\n"," )\n","\n"," lro(client, response, return_status_code=True)\n","\n"," print(\n"," f\"{icons.green_dot} The '{name}' pipeline was updated within the '{workspace}' workspace.\"\n"," )\n","\n","def _is_valid_uuid(\n"," guid: str,\n","):\n"," \"\"\"\n"," Validates if a string is a valid GUID in version 4\n","\n"," Parameters\n"," ----------\n"," guid : str\n"," GUID to be validated.\n","\n"," Returns\n"," -------\n"," bool\n"," Boolean that indicates if the string is a GUID or not.\n"," \"\"\"\n","\n"," try:\n"," UUID(str(guid), version=4)\n"," return True\n"," except ValueError:\n"," return False"],"outputs":[],"execution_count":null,"metadata":{"jupyter":{"source_hidden":false,"outputs_hidden":false},"nteract":{"transient":{"deleting":false}},"microsoft":{"language":"python","language_group":"synapse_pyspark"}},"id":"bdd46d4a-ef58-4f9a-b2e8-a428361a17c1"},{"cell_type":"code","source":["import json\n","from jsonpath_ng import jsonpath, parse\n","from typing import Optional, Tuple, List\n","from uuid import UUID\n","\n","source_ws = ''\n","target_ws = ''\n","\n","\n","# Swaps the connection properties of an activity belonging to the specified item type(s)\n","def swap_pipeline_connection(pl_json: dict, p_source_ws: str,p_target_ws: str, \n"," p_item_type: List =['DataWarehouse','Lakehouse','Notebook'], \n"," p_conn_id_from_to: Optional[List[Tuple[str,str]]]=[]):\n"," \n"," source_ws_id = fabric.resolve_workspace_id(source_ws)\n","\n"," target_ws_id = fabric.resolve_workspace_id(target_ws)\n","\n"," if 'Warehouse' in p_item_type or 'Lakehouse' in p_item_type:\n"," ls_expr = parse('$..linkedService')\n"," for endpoint_match in ls_expr.find(pl_json):\n"," if endpoint_match.value['properties']['type'] == 'DataWarehouse' \\\n"," and endpoint_match.value['properties']['typeProperties']['workspaceId'] == source_ws_id \\\n"," and 'Warehouse' in p_item_type:\n"," # only update the warehouse if it was located in the source workspace i.e. we will update the properties to the target workspace if the warehouse resided in the same workspace as the pipeline\n"," #print(endpoint_match.value)\n"," warehouse_id = endpoint_match.value['properties']['typeProperties']['artifactId']\n"," #print(warehouse_id)\n"," warehouse_endpoint = endpoint_match.value['properties']['typeProperties']['endpoint']\n"," #print(warehouse_endpoint)\n"," \n"," source_wh_name = fabric.resolve_item_name(item_id = warehouse_id,workspace=source_ws_id)\n"," #print(remote_wh_name)\n"," # find the warehouse id of the warehouse with the same name in the target workspace\n"," target_wh_id = fabric.resolve_item_id(item_name = source_wh_name,type='Warehouse',workspace=target_ws_id)\n"," # look up the connection string for the warehouse in the target workspace\n"," whurl = f\"v1/workspaces/{target_ws_id}/warehouses/{target_wh_id}\"\n"," whresponse = client.get(whurl)\n"," lhconnStr = whresponse.json()['properties']['connectionString']\n"," endpoint_match.value['properties']['typeProperties']['artifactId'] = target_wh_id\n"," endpoint_match.value['properties']['typeProperties']['workspaceId'] = target_ws_id\n"," endpoint_match.value['properties']['typeProperties']['endpoint'] = lhconnStr\n"," #print(endpoint_match.value)\n"," ls_expr.update(endpoint_match,endpoint_match.value)\n"," if endpoint_match.value['properties']['type'] == 'Lakehouse' \\\n"," and endpoint_match.value['properties']['typeProperties']['workspaceId'] == source_ws_id \\\n"," and 'Lakehouse' in p_item_type:\n"," #print(endpoint_match.value)\n"," lakehouse_id = endpoint_match.value['properties']['typeProperties']['artifactId']\n"," remote_lh_name = fabric.resolve_item_name(item_id = lakehouse_id,workspace=source_ws_id)\n"," # find the lakehouse id of the lakehouse with the same name in the target workspace\n"," target_lh_id = fabric.resolve_item_id(item_name = remote_lh_name,type='Lakehouse',workspace=target_ws_id)\n"," endpoint_match.value['properties']['typeProperties']['artifactId'] = target_lh_id\n"," endpoint_match.value['properties']['typeProperties']['workspaceId'] = target_ws_id\n"," ls_expr.update(endpoint_match,endpoint_match.value)\n"," # print(endpoint_match.value)\n","\n","\n"," if 'Notebook' in p_item_type: \n"," ls_expr = parse('$..activities')\n","\n"," for endpoint_match in ls_expr.find(pl_json):\n"," for activity in endpoint_match.value:\n"," #print(activity['type'])\n"," if activity['type']=='TridentNotebook' and 'Notebook' in p_item_type: #only update if the notebook was in the same workspace as the pipeline\n"," print('change from '+activity['typeProperties']['workspaceId'])\n"," source_nb_id = activity['typeProperties']['notebookId']\n"," source_nb_name = fabric.resolve_item_name(item_id = source_nb_id,workspace=source_ws_id)\n"," target_nb_id = fabric.resolve_item_id(item_name = source_nb_name,type='Notebook',workspace=target_ws_id)\n"," activity['typeProperties']['notebookId']=target_nb_id\n"," activity['typeProperties']['workspaceId']=target_ws_id\n"," print('to notebook '+ target_nb_id)\n"," #ls_expr.update(endpoint_match,endpoint_match.value)\n","\n"," if p_conn_from_to:\n"," for ti_conn_from_to in p_conn_from_to:\n"," if not _is_valid_uuid(ti_conn_from_to[0]):\n"," print('Connection from is string '+ str(ti_conn_from_to[0]))\n"," dfC_filt = df_conns[df_conns[\"Connection Name\"] == ti_conn_from_to[0]] \n"," connId_from = dfC_filt['Connection Id'].iloc[0] \n"," else:\n"," connId_from = ti_conn_from_to[0]\n","\n"," if not _is_valid_uuid(ti_conn_from_to[1]):\n"," print('Connection from is string '+ str(ti_conn_from_to[1]))\n"," dfC_filt = df_conns[df_conns[\"Connection Name\"] == ti_conn_from_to[1]] \n"," connId_to = dfC_filt['Connection Id'].iloc[0] \n"," else:\n"," connId_to = ti_conn_from_to[1]\n","\n"," ls_expr = parse('$..externalReferences')\n"," for externalRef in ls_expr.find(pl_json):\n"," if externalRef.value['connection']==connId_from:\n"," print('Changing connection from '+str(connId_from))\n"," externalRef.value['connection']=connId_to\n"," ls_expr.update(externalRef,externalRef.value)\n"," print('to '+str(connId_to))\n","\n"," return pl_json\n","\n","\n","\n","# loading a dataframe of connections to perform an ID lookup if required \n","df_conns = labs.list_connections()\n","\n","df_pipeline = labs.list_data_pipelines(target_ws)\n","for index, row in df_pipeline.iterrows():\n"," #print(labs.get_data_pipeline_definition(row['Data Pipeline Name'],target_ws))\n"," if row['Data Pipeline Name']=='plRecover_WH6_Prod2_Warehouse2_fixed':\n"," pipeline_json = json.loads(labs.get_data_pipeline_definition(row['Data Pipeline Name'],source_ws))\n","\n"," p_new_json = swap_pipeline_connection(pipeline_json, source_ws,target_ws,\n"," ['DataWarehouse','Lakehouse','Notebook'],\n"," [p_connections_from_to]) \n"," #print(json.dumps(pipeline_json, indent=4))\n"," \n"," update_data_pipeline_definition(name=row['Data Pipeline Name'],pipeline_content=pipeline_json, workspace=target_ws)\n"],"outputs":[],"execution_count":null,"metadata":{"jupyter":{"source_hidden":false,"outputs_hidden":false},"nteract":{"transient":{"deleting":false}},"microsoft":{"language":"python","language_group":"synapse_pyspark"}},"id":"079958e8-2880-484a-a994-41caf47e747e"},{"cell_type":"markdown","source":["##### Commit changes made above to Git"],"metadata":{"nteract":{"transient":{"deleting":false}},"microsoft":{"language":"python","language_group":"synapse_pyspark"}},"id":"44174276-b983-4e80-9451-0afb9589cf1f"},{"cell_type":"code","source":["labs.commit_to_git(comment='Initial', workspace=target_ws)"],"outputs":[],"execution_count":null,"metadata":{"jupyter":{"source_hidden":false,"outputs_hidden":false},"nteract":{"transient":{"deleting":false}},"microsoft":{"language":"python","language_group":"synapse_pyspark"}},"id":"9a5c3d84-f71d-4348-b419-c4953ac9e1d0"}],"metadata":{"kernel_info":{"name":"synapse_pyspark"},"kernelspec":{"name":"synapse_pyspark","language":"Python","display_name":"Synapse PySpark"},"language_info":{"name":"python"},"microsoft":{"language":"python","language_group":"synapse_pyspark","ms_spell_check":{"ms_spell_check_language":"en"}},"widgets":{},"nteract":{"version":"nteract-front-end@1.0.0"},"synapse_widget":{"version":"0.1","state":{}},"spark_compute":{"compute_id":"/trident/default","session_options":{"conf":{"spark.synapse.nbs.session.timeout":"1200000"}}},"dependencies":{"lakehouse":{}}},"nbformat":4,"nbformat_minor":5}
\ No newline at end of file
+{"cells":[{"cell_type":"markdown","source":["##### Branch out to new workspace notebook - post activity\n","\n","After cloning a workspace, this notebook will reconfigure any references to the old workspace by rebinding them to the new workspace. \n","\n","For example a pipeline referencing a warehouse or a default lakehouse of a notebook.\n","\n","This notebook runs post activity tasks can be run after [branch out to new workspace functionality](https://blog.fabric.microsoft.com/en-us/blog/introducing-new-branching-capabilities-in-fabric-git-integration) or the [custom AzDO script](https://github.com/microsoft/fabric-toolbox/blob/main/accelerators/CICD/Branch-out-to-new-workspace/AzDO/scripts/BranchOut-Feature-Workspace-Automation.py).\n","\n","Summary of post activities in order:\n","\n","- Default lakehouses and warehouse are updated to local lakehouse/warehouses
\n","- Either creates shortcuts in local lakehouse back to tables in the source lakehouse, or copies the data from source lakehouse. Set via parameter below.
\n","- Copy warehouse data. Set via parameter below
\n","- Changes directlake semantic model connections for semantic models to \"local\" lakehouse/warehouse
\n","- Rebinds reports to \"local\" semantic models
\n","- Changes pipeline lakehouse/warehouse references to local item
\n","- Ability to swap connections in pipelines from old to new
\n","- Commit changes to git
\n","
\n","\n","Requirements:\n","\n","- Requires Semantic Link Labs installed by pip install below or added to environment library.
\n","- Requires JmesPath library for data pipeline JSON manipulation i.e. connection swaps.
\n","
\n","\n","Limitations of current script:\n","\n","\n","- Does not recreate item shares or external shortcuts
\n","- Does not re-apply lakehouse SQL Endpoint or Warehouse object/row/column level security
\n","- Does not recreate data access roles in Lakehouse
\n","- Untested with Lakehouses where with schema support enabled
\n","
\n","\n","\n"],"metadata":{"nteract":{"transient":{"deleting":false}},"microsoft":{"language":"python","language_group":"synapse_pyspark"}},"id":"a98b6d0a-7a36-4116-ab0d-aa70144eb737"},{"cell_type":"markdown","source":["##### Install semantic link labs to support advanced functionality\n","\n","https://semantic-link-labs.readthedocs.io/en/latest/index.html\n","https://github.com/microsoft/semantic-link-labs/blob/main/README.md\n","\n"],"metadata":{"nteract":{"transient":{"deleting":false}},"microsoft":{"language":"python","language_group":"synapse_pyspark"}},"id":"3b887bd6-a9c9-430f-b58f-b58a93f5ce29"},{"cell_type":"code","source":["%pip -q install semantic-link-labs\n"],"outputs":[],"execution_count":null,"metadata":{"jupyter":{"source_hidden":false,"outputs_hidden":true},"nteract":{"transient":{"deleting":false}},"microsoft":{"language":"python","language_group":"synapse_pyspark"}},"id":"1b03316d-c088-4a0e-a2f0-44d45d112121"},{"cell_type":"markdown","source":["##### Install Jmespath to make data pipeline changes such as updating linked notebooks, warehouses and lakehouses "],"metadata":{"nteract":{"transient":{"deleting":false}},"microsoft":{"language":"python","language_group":"synapse_pyspark"}},"id":"8a74ed11-dd64-43bb-a735-906a947c8666"},{"cell_type":"code","source":["%pip install jmespath"],"outputs":[],"execution_count":null,"metadata":{"jupyter":{"source_hidden":false,"outputs_hidden":false},"nteract":{"transient":{"deleting":false}},"microsoft":{"language":"python","language_group":"synapse_pyspark"}},"id":"c68be6ba-7648-457f-af82-f1987d12d7f7"},{"cell_type":"markdown","source":["##### Set these parameters if running as a standaone noteook\n","Before running this notebook ensure these parameters are set correctly. If necessary these can be passed in via a data factory pipeline.\n","\n","If running this notebook from Azure DevOps ignore these parameters as these will be passed from the release pipeline script"],"metadata":{"nteract":{"transient":{"deleting":false}},"microsoft":{"language":"python","language_group":"synapse_pyspark"}},"id":"dee81614-b92b-4242-890a-b11f97b1a640"},{"cell_type":"code","source":["source_ws = ''\n","target_ws = ''\n","\n","# Either copy lakehouse data or create shortcuts, set at most one of these to True \n","copy_lakehouse_data = False\n","create_lakehouse_shortcuts = True\n","\n","# Option to copy warehouse data if required\n","copy_warehouse_data = False\n","\n","# If false then shortcuts will be created. If you wish to create shortcuts based on a pattern match please set the param below\n","# enter pattern match for creating shortcuts - see https://github.com/arasdk/fabric-code-samples/blob/main/shortcuts/fabric_shortcut_creator.py \n","PATTERN_MATCH = [\"*\"]\n","\n","# Set connections to be replaced from previous name or ID to new name or ID.\n","connections_from_to = () #('https://api.fabric.microsoft.com/v1/workspaces/ admin','4498340c-27cf-4c6e-a025-00e5de6b0726'),('4498340c-27cf-4c6e-a025-00e5de6b0726','https://api.fabric.microsoft.com/v1/workspaces/ admin'),('https://api.fabric.microsoft.com/v1/workspaces/ admin','4498340c-27cf-4c6e-a025-00e5de6b0726')\n","\n","# Determines whether lakehoues tables/shortcuts should be created before git sync due to warehouse dependencies \n","has_wh_views_on_lh = True\n","\n","# Set the Azure DevOps related parameters \n","project_name=''\n","repo_name=''\n","main_branch = ''\n","branch_name = ''\n","org_name = ''\n","ado_api_url = 'https://dev.azure.com/'\n","\n","# Azure Key Vault and Secret Name which stores the Azure DevOps PAT\n","key_vault_name = ''\n","secret_name = ''\n","target_capacity = '' # name or ID. leave empty '' to use the same capacity as source workspace\n","\n","\n","### Do not change these parameters ####\n","# internal parameter to allow the installation of Python libraries when being run programatically. See https://learn.microsoft.com/en-us/fabric/data-engineering/library-management#python-inline-installation\n","_inlineInstallationEnabled = True\n","\n","# indicates whether this script is called from Azure Devops\n","_runStandalone = True\n"],"outputs":[],"execution_count":null,"metadata":{"jupyter":{"source_hidden":false,"outputs_hidden":false},"nteract":{"transient":{"deleting":false}},"microsoft":{"language":"python","language_group":"synapse_pyspark"},"tags":["parameters"]},"id":"90efaa4f-846d-4924-900e-258837a3467d"},{"cell_type":"markdown","source":["##### Library imports and fabric rest client setup\n","\n","https://learn.microsoft.com/en-us/python/api/semantic-link-sempy/sempy.fabric.fabricrestclient"],"metadata":{"nteract":{"transient":{"deleting":false}},"microsoft":{"language":"python","language_group":"synapse_pyspark"}},"id":"4fb01e1d-ec4e-4c69-b544-66f6d8c5a475"},{"cell_type":"code","source":["import pandas as pd\n","import datetime, time\n","import re,json, fnmatch,os\n","import requests, base64\n","import sempy\n","import sempy.fabric as fabric\n","from sempy.fabric.exceptions import FabricHTTPException, WorkspaceNotFoundException\n","from pyspark.sql import DataFrame\n","from pyspark.sql.functions import col,current_timestamp,lit\n","import sempy_labs as labs\n","from sempy_labs import migration, directlake\n","from sempy_labs import lakehouse as lake\n","from sempy_labs import report as rep\n","from sempy_labs.tom import connect_semantic_model\n","from typing import Optional\n","from sempy_labs._helper_functions import (\n"," resolve_workspace_name_and_id,\n"," lro,\n"," _decode_b64,\n",")\n","import sempy_labs._icons as icons\n","\n","import base64\n","from typing import Optional, Tuple, List\n","from uuid import UUID\n","import ast\n","from jsonpath_ng import jsonpath, parse\n","\n","# instantiate the Fabric rest client\n","client = fabric.FabricRestClient()\n","\n","# get the current workspace ID based on the context of where this notebook is run from\n","thisWsId = notebookutils.runtime.context['currentWorkspaceId']\n","thisWsName = notebookutils.runtime.context['currentWorkspaceName']\n","\n","source_ws_id = fabric.resolve_workspace_id(source_ws)\n","target_ws_id = '' # will be set later"],"outputs":[],"execution_count":null,"metadata":{"jupyter":{"source_hidden":false,"outputs_hidden":false},"nteract":{"transient":{"deleting":false}},"microsoft":{"language":"python","language_group":"synapse_pyspark"}},"id":"391624c1-b299-452d-9ebf-f32626d49970"},{"cell_type":"markdown","source":["##### Always run this cell\n","\n","Contains utility functions to support lakehouse and warehouse initialisation\n","\n","and\n","\n","Shortcut creator:\n","https://github.com/arasdk/fabric-code-samples/blob/main/shortcuts/fabric_shortcut_creator.py "],"metadata":{"nteract":{"transient":{"deleting":false}},"microsoft":{"language":"python","language_group":"synapse_pyspark"}},"id":"4e7d0414-515c-4627-a183-553ce4ccf8e5"},{"cell_type":"code","source":["#### \n","### Setup and AzDO functions \n","####\n","\n","\n","def _is_valid_uuid(\n"," guid: str,\n","):\n"," \"\"\"\n"," Validates if a string is a valid GUID in version 4\n","\n"," Parameters\n"," ----------\n"," guid : str\n"," GUID to be validated.\n","\n"," Returns\n"," -------\n"," bool\n"," Boolean that indicates if the string is a GUID or not.\n"," \"\"\"\n","\n"," try:\n"," UUID(str(guid), version=4)\n"," return True\n"," except ValueError:\n"," return False\n","\n","def get_capacity_status(p_target_cap):\n"," dfC = fabric.list_capacities()\n"," dfC_filt = dfC[dfC[\"Id\"] == p_target_cap]\n"," return dfC_filt['State'].iloc[0]\n","\n","\n","#TODO create token via SPN\n","def get_branch_object_id(p_ado_url,p_branch_name, p_token):\n"," try:\n"," print(f\"Retriving ID of main branch {branch_name} to be cloned \")\n"," headers = {'Authorization': f'Basic {p_token}',\n"," 'Content-Type': 'application/json'\n"," }\n","\n"," response = requests.get(f\"{p_ado_url}/heads/{p_branch_name}?api-version=7.1\", headers=headers)\n"," return response.json()[\"value\"][0][\"objectId\"]\n"," except requests.exceptions.RequestException as e:\n"," print(f\"Error getting branch object ID: {e}\")\n"," return None\n","\n","def encode_pat(pat):\n"," # Encode the PAT in base64\n"," encoded_pat = base64.b64encode(pat.encode('utf-8')).decode('utf-8')\n"," return encoded_pat\n","\n","def create_azdo_branch(p_key_vault_name,p_secret_name, p_branch_name, p_main_branch,p_repo_name,p_ado_url):\n"," access_token =notebookutils.credentials.getToken('keyvault')\n"," url = f'https://{p_key_vault_name}.vault.azure.net/secrets/{p_secret_name}?api-version=7.3'\n"," headers = {\n"," 'Authorization': f'Bearer {access_token}',\n"," 'Content-Type': 'application/json'\n"," }\n","\n"," response = requests.get(url, headers=headers)\n"," if response.status_code == 200:\n"," #print(response.json()['value'])\n"," pat_token = encode_pat(':'+response.json()['value'])\n"," else:\n"," raise ValueError(f\"Failed to get secret: {response.status_code} - {response.text}\")\n","\n"," try:\n"," print(f\"Creating feature branch {p_branch_name} based on {p_main_branch} in progress\")\n"," headers = {\"Authorization\": f\"Basic {pat_token}\", \"Content-Type\": \"application/json\"}\n"," data = [\n"," {\n"," \"name\":f\"refs/heads/{p_branch_name}\",\n"," \"oldObjectId\": \"0000000000000000000000000000000000000000\",\n"," \"newObjectId\": get_branch_object_id(p_ado_url,p_main_branch, pat_token)\n"," }\n"," ]\n"," response = requests.post(f\"{p_ado_url}?api-version=7.1\", headers=headers, json=data)\n"," response.raise_for_status()\n"," if (response.json()['value'][0]['success']):\n"," return True\n"," else:\n"," return False\n"," except requests.exceptions.RequestException as e:\n"," raise ValueError(f\"Error creating Azure DevOps branch: {e}\")\n","\n","\n","\n","##### \n","### Shortcut utility function \n","####\n","\n","# Extract workspace_id, item_id and path from a onelake URI\n","def extract_onelake_https_uri_components(uri):\n"," # Define a regular expression to match any string between slashes and capture the final path element(s) without the leading slash\n"," pattern = re.compile(r\"abfss://([^@]+)@[^/]+/([^/]+)/(.*)\")\n"," match = pattern.search(uri)\n"," if match:\n"," workspace_id, item_id, path = match.groups()\n"," return workspace_id, item_id, path\n"," else:\n"," return None, None, None\n","\n","\n","def is_valid_onelake_uri(uri: str) -> bool:\n"," workspace_id, item_id, path = extract_onelake_https_uri_components(uri)\n"," if \"abfss://\" not in uri or workspace_id is None or item_id is None or path is None:\n"," return False\n","\n"," return True\n","\n","\n","def get_last_path_segment(uri: str):\n"," path = uri.split(\"/\") # Split the entire URI by '/'\n"," return path[-1] if path else None\n","\n","\n","def is_delta_table(uri: str):\n"," delta_log_path = os.path.join(uri, \"_delta_log\")\n"," return mssparkutils.fs.exists(delta_log_path)\n","\n","\n","def get_onelake_shorcut(workspace_id: str, item_id: str, path: str, name: str):\n"," shortcut_uri = (\n"," f\"v1/workspaces/{workspace_id}/items/{item_id}/shortcuts/{path}/{name}\"\n"," )\n"," result = client.get(shortcut_uri).json()\n"," return result\n","\n","\n","def is_folder_matching_pattern(path: str, folder_name: str, patterns: []):\n"," if folder_name in patterns:\n"," return True\n"," else:\n"," for pattern in patterns:\n"," if fnmatch.fnmatch(folder_name, pattern):\n"," return is_delta_table(path)\n","\n"," return False\n","\n","\n","def get_matching_delta_tables_uris(uri: str, patterns: []) -> []:\n"," # Use a set to avoid duplicates\n"," matched_uris = set()\n"," files = mssparkutils.fs.ls(uri)\n"," folders = [item for item in files if item.isDir]\n","\n"," # Filter folders to only those that matches the pattern and is a delta table\n"," matched_uris.update(\n"," folder.path\n"," for folder in folders\n"," if is_folder_matching_pattern(folder.path, folder.name, patterns)\n"," )\n","\n"," return matched_uris\n","\n","\n","def create_onelake_shorcut(source_uri: str, dest_uri: str):\n"," src_workspace_id, src_item_id, src_path = extract_onelake_https_uri_components(\n"," source_uri\n"," )\n","\n"," dest_workspace_id, dest_item_id, dest_path = extract_onelake_https_uri_components(\n"," dest_uri\n"," )\n","\n"," name = get_last_path_segment(source_uri)\n"," dest_uri_joined = os.path.join(dest_uri, name)\n","\n"," # If the destination path already exists, return without creating shortcut\n"," if mssparkutils.fs.exists(dest_uri_joined):\n"," print(f\"Destination already exists: {dest_uri_joined}\")\n"," return None\n","\n"," request_body = {\n"," \"name\": name,\n"," \"path\": dest_path,\n"," \"target\": {\n"," \"oneLake\": {\n"," \"itemId\": src_item_id,\n"," \"path\": src_path,\n"," \"workspaceId\": src_workspace_id,\n"," }\n"," },\n"," }\n","\n"," shortcut_uri = f\"v1/workspaces/{dest_workspace_id}/items/{dest_item_id}/shortcuts\"\n"," print(f\"Creating shortcut: {shortcut_uri}/{name}..\")\n"," try:\n"," client.post(shortcut_uri, json=request_body)\n"," except FabricHTTPException as e:\n"," print(e)\n"," return None\n","\n"," return get_onelake_shorcut(dest_workspace_id, dest_item_id, dest_path, name)\n"," \n","\n","####\n","## Copy lakehouse and warehouse utility functions\n","####\n","\n","def get_lh_object_list(base_path,data_types = ['Tables', 'Files'])->pd.DataFrame:\n","\n"," '''\n"," Function to get a list of tables for a lakehouse\n"," adapted from https://fabric.guru/getting-a-list-of-folders-and-delta-tables-in-the-fabric-lakehouse\n"," This function will return a pandas dataframe containing names and abfss paths of each folder for Files and Tables\n"," '''\n"," #data_types = ['Tables', 'Files'] #for if you want a list of files and tables\n"," #data_types = ['Tables'] #for if you want a list of tables\n","\n"," df = pd.concat([\n"," pd.DataFrame({\n"," 'name': [item.name for item in notebookutils.fs.ls(f'{base_path}/{data_type}/')],\n"," 'type': data_type[:-1].lower() , \n"," 'src_path': [item.path for item in notebookutils.fs.ls(f'{base_path}/{data_type}/')],\n"," }) for data_type in data_types], ignore_index=True)\n","\n"," return df\n","\n","def get_wh_object_list(schema_list,base_path)->pd.DataFrame:\n","\n"," '''\n"," Function to get a list of tables for a warehouse by schema\n"," '''\n"," data_type = 'Tables'\n"," dfs = []\n","\n"," for schema_prefix in schema_list:\n"," if notebookutils.fs.exists(f'{base_path}/{data_type}/{schema_prefix}/'):\n"," items = notebookutils.fs.ls(f'{base_path}/{data_type}/{schema_prefix}/')\n"," if items: # Check if the list is not empty\n"," df = pd.DataFrame({\n"," 'schema': schema_prefix,\n"," 'name': [item.name for item in items],\n"," 'type': data_type[:-1].lower(),\n"," 'src_path': [item.path for item in items],\n"," })\n"," dfs.append(df)\n","\n"," if dfs: # Check if the list of dataframes is not empty\n"," df = pd.concat(dfs, ignore_index=True)\n"," else:\n"," df = pd.DataFrame() # Return an empty dataframe if no dataframes were created\n","\n"," return df\n","\n","def copy_lh_objects(table_list,workspace_src,workspace_tgt,lakehouse_src,lakehouse_tgt,fastcopy=True,usingIDs=False)->pd.DataFrame:\n"," # declare an array to keep the instrumentation\n"," cpresult = []\n"," # loop through all the tables to extract the source path \n"," for table in table_list.src_path:\n"," source = table\n"," destination = source.replace(f'abfss://{workspace_src}', f'abfss://{workspace_tgt}')\n"," if usingIDs:\n"," destination = destination.replace(f'{lakehouse_src}', f'{lakehouse_tgt}')\n"," else:\n"," destination = destination.replace(f'{lakehouse_src}.Lakehouse', f'{lakehouse_tgt}.Lakehouse')\n"," start_time = datetime.datetime.now()\n"," if notebookutils.fs.exists(destination):\n"," notebookutils.fs.rm(destination, True)\n"," if fastcopy:\n"," # use fastcopy util which is a python wrapper to azcopy\n"," notebookutils.fs.fastcp(source+'/*', destination+'/', True)\n"," else:\n"," notebookutils.fs.cp(source, destination, True)\n","\n"," # recording the timing and add it to the results list\n"," end_time = datetime.datetime.now()\n"," copyreslist = [source, destination, start_time.strftime(\"%Y-%m-%d %H:%M:%S\"), end_time.strftime(\"%Y-%m-%d %H:%M:%S\"), str((end_time - start_time).total_seconds())]\n"," cpresult.append(copyreslist)\n"," return pd.DataFrame(cpresult,columns =['source--------------------------------------','target--------------------------------------','start------------','end_time------------','elapsed seconds----'])\n","\n","def createDWrecoverypl(ws_id,pl_name = 'Recover_Warehouse_Data_From_DR'):\n"," client = fabric.FabricRestClient()\n","\n"," dfurl= \"v1/workspaces/\"+ ws_id + \"/items\"\n"," payload = { \n"," \"displayName\": pl_name, \n"," \"type\": \"DataPipeline\", \n"," \"definition\": { \n"," \"parts\": [ \n"," { \n"," \"path\": \"pipeline-content.json\", \n"," \"payload\": \"{
    "properties": {
        "activities": [
            {
                "name": "IterateSchemaTables",
                "type": "ForEach",
                "dependsOn": [],
                "typeProperties": {
                    "items": {
                        "value": "@pipeline().parameters.tablesToCopy",
                        "type": "Expression"
                    },
                    "batchCount": 20,
                    "activities": [
                        {
                            "name": "CopyWarehouseTables",
                            "type": "Copy",
                            "dependsOn": [
                                {
                                    "activity": "Set table",
                                    "dependencyConditions": [
                                        "Succeeded"
                                    ]
                                }
                            ],
                            "policy": {
                                "timeout": "0.12:00:00",
                                "retry": 2,
                                "retryIntervalInSeconds": 30,
                                "secureOutput": false,
                                "secureInput": false
                            },
                            "typeProperties": {
                                "source": {
                                    "type": "DataWarehouseSource",
                                    "queryTimeout": "02:00:00",
                                    "partitionOption": "None",
                                    "datasetSettings": {
                                        "annotations": [],
                                        "linkedService": {
                                            "name": "07a03006_d1b6_4a39_beb1_0bba2aaf5ff7",
                                            "properties": {
                                                "annotations": [],
                                                "type": "DataWarehouse",
                                                "typeProperties": {
                                                    "endpoint": "@pipeline().parameters.lakehouseConnStr",
                                                    "artifactId": "@pipeline().parameters.lakehouseId",
                                                    "workspaceId": "@pipeline().parameters.workspaceId"
                                                }
                                            }
                                        },
                                        "type": "DataWarehouseTable",
                                        "schema": [],
                                        "typeProperties": {
                                            "schema": "dbo",
                                            "table": {
                                                "value": "@concat(concat(item().schema,'_'),item().name)",
                                                "type": "Expression"
                                            }
                                        }
                                    }
                                },
                                "sink": {
                                    "type": "DataWarehouseSink",
                                    "allowCopyCommand": true,
                                    "tableOption": "autoCreate",
                                    "datasetSettings": {
                                        "annotations": [],
                                        "linkedService": {
                                            "name": "0c03123a_d312_46c4_a8e7_5b4cad8f12d7",
                                            "properties": {
                                                "annotations": [],
                                                "type": "DataWarehouse",
                                                "typeProperties": {
                                                    "endpoint": "@pipeline().parameters.warehouseConnStr",
                                                    "artifactId": "@pipeline().parameters.warehouseId",
                                                    "workspaceId": "@pipeline().parameters.workspaceId"
                                                }
                                            }
                                        },
                                        "type": "DataWarehouseTable",
                                        "schema": [],
                                        "typeProperties": {
                                            "schema": "dbo",
                                            "table": {
                                                "value": "@item().name",
                                                "type": "Expression"
                                            }
                                        }
                                    }
                                },
                                "enableStaging": true,
                                "translator": {
                                    "type": "TabularTranslator",
                                    "typeConversion": true,
                                    "typeConversionSettings": {
                                        "allowDataTruncation": true,
                                        "treatBooleanAsNumber": false
                                    }
                                }
                            }
                        },
                        {
                            "name": "Set table",
                            "type": "SetVariable",
                            "dependsOn": [
                                {
                                    "activity": "Set schema",
                                    "dependencyConditions": [
                                        "Succeeded"
                                    ]
                                }
                            ],
                            "policy": {
                                "secureOutput": false,
                                "secureInput": false
                            },
                            "typeProperties": {
                                "variableName": "Tablename",
                                "value": {
                                    "value": "@item().name",
                                    "type": "Expression"
                                }
                            }
                        },
                        {
                            "name": "Set schema",
                            "type": "SetVariable",
                            "dependsOn": [],
                            "policy": {
                                "secureOutput": false,
                                "secureInput": false
                            },
                            "typeProperties": {
                                "variableName": "Schemaname",
                                "value": {
                                    "value": "@item().schema",
                                    "type": "Expression"
                                }
                            }
                        }
                    ]
                }
            }
        ],
        "parameters": {
            "lakehouseId": {
                "type": "string",
                "defaultValue": "0f0f6b7c-1761-41e6-896e-30014f16ff6d"
            },
            "tablesToCopy": {
                "type": "array",
                "defaultValue": [
                    {
                        "schema": "dbo",
                        "name": "Date"
                    },
                    {
                        "schema": "dbo",
                        "name": "Geography"
                    },
                    {
                        "schema": "dbo",
                        "name": "HackneyLicense"
                    },
                    {
                        "schema": "dbo",
                        "name": "Medallion"
                    },
                    {
                        "schema": "dbo",
                        "name": "Time"
                    },
                    {
                        "schema": "dbo",
                        "name": "Trip"
                    },
                    {
                        "schema": "dbo",
                        "name": "Weather"
                    }
                ]
            },
            "workspaceId": {
                "type": "string",
                "defaultValue": "1501143c-272f-4a2f-976a-7e55971e4c2b"
            },
            "warehouseId": {
                "type": "string",
                "defaultValue": "4d1bd951-99de-4bd7-b7bc-71c8f56db411"
            },
            "warehouseConnStr": {
                "type": "string",
                "defaultValue": "72wwbivi2ubejbrtmtaho32b4y-hqkacfjpe4xuvf3kpzkzohsmfm.datawarehouse.fabric.microsoft.com"
            },
            "lakehouseConnStr": {
                "type": "string",
                "defaultValue": "72wwbivi2ubejbrtmtaho32b4y-hqkacfjpe4xuvf3kpzkzohsmfm.datawarehouse.fabric.microsoft.com"
            }
        },
        "variables": {
            "Tablename": {
                "type": "String"
            },
            "Schemaname": {
                "type": "String"
            }
        },
        "lastModifiedByObjectId": "4aa20af7-94bd-4348-bef8-f8cbcd840d51",
        "lastPublishTime": "2024-11-13T15:52:52Z"
    }
}\", \n"," \"payloadType\": \"InlineBase64\" \n"," } \n"," ] \n"," } \n","} \n"," \n"," response = json.loads(client.post(dfurl,json= payload).content)\n"," return response['id']\n","\n","def getItemId(wks_id,itm_name,itm_type):\n"," df = fabric.list_items(type=None,workspace=wks_id)\n"," #print(df)\n"," if df.empty:\n"," return 'NotExists'\n"," else:\n"," #display(df)\n"," #print(df.query('\"Display Name\"=\"'+itm_name+'\"'))\n"," if itm_type != '':\n"," newdf= df.loc[(df['Display Name'] == itm_name) & (df['Type'] == itm_type)]['Id']\n"," else:\n"," newdf= df.loc[(df['Display Name'] == itm_name)]['Id'] \n"," if newdf.empty:\n"," return 'NotExists'\n"," else:\n"," return newdf.iloc[0]\n","\n","\n","# modified semantic link labs function - pendin PR\n","import sempy_labs._icons as icons\n","from typing import Optional, List\n","from sempy_labs._helper_functions import (\n"," resolve_workspace_name_and_id,\n"," _base_api,\n",")\n","from uuid import UUID\n","\n","def initialize_git_connection(workspace: Optional[str | UUID] = None, initialization_strategy: Optional[str] = 'None') -> str:\n"," \"\"\"\n"," Initializes a connection for a workspace that is connected to Git.\n","\n"," This is a wrapper function for the following API: `Git - Initialize Connection `_.\n","\n"," Parameters\n"," ----------\n"," workspace : str | uuid.UUID, default=None\n"," The Fabric workspace name or ID.\n"," Defaults to None which resolves to the workspace of the attached lakehouse\n"," or if no lakehouse attached, resolves to the workspace of the notebook.\n"," initialization_strategy : str\n"," Defines the strategy required for an initialization process when content \n"," exists on both the remote side and the workspace side. \n"," Additional strategies may be added over time.\n"," Supported options: PreferWorkspace, PreferRemote or None (default)\n","\n"," Returns\n"," -------\n"," str\n"," Remote full SHA commit hash.\n"," \"\"\"\n","\n"," (workspace_name, workspace_id) = resolve_workspace_name_and_id(workspace)\n","\n","\n"," payload = {\n"," \"initializationStrategy\": initialization_strategy\n"," }\n"," response_json = _base_api(\n"," request=f\"/v1/workspaces/{workspace_id}/git/initializeConnection\",\n"," payload=payload,\n"," method=\"post\",\n"," lro_return_json=True,\n"," status_codes=None,\n"," )\n"," \n"," print(\n"," f\"{icons.green_dot} The '{workspace_name}' workspace git connection has been initialized.\"\n"," )\n"," return response_json.get(\"remoteCommitHash\")\n","\n","\n","\n","def initialize_git_connection_int(workspace: Optional[str | UUID] = None, initialization_strategy: Optional[str] = 'None') -> str:\n"," \"\"\"\n"," Initializes a connection for a workspace that is connected to Git.\n","\n"," This is a wrapper function for the following API: `Git - Initialize Connection `_.\n","\n"," Parameters\n"," ----------\n"," workspace : str | uuid.UUID, default=None\n"," The Fabric workspace name or ID.\n"," Defaults to None which resolves to the workspace of the attached lakehouse\n"," or if no lakehouse attached, resolves to the workspace of the notebook.\n"," initialization_strategy : str\n"," Defines the strategy required for an initialization process when content \n"," exists on both the remote side and the workspace side. \n"," Additional strategies may be added over time.\n"," Supported options: PreferWorkspace, PreferRemote or None (default)\n","\n"," Returns\n"," -------\n"," str\n"," Remote full SHA commit hash.\n"," \"\"\"\n","\n"," (workspace_name, workspace_id) = resolve_workspace_name_and_id(workspace)\n","\n","\n"," #payload = {\n"," # \"initializationStrategy\": initialization_strategy\n"," #}\n"," url = f\"https://wabi-west-us3-a-primary-redirect.analysis.windows.net/metadata/git/workspaces/{target_ws_id}/connection/initialize\"\n"," #response_json = _base_api(\n"," # request=f\"/v1/workspaces/{workspace_id}/git/initializeConnection\",\n"," # payload=payload,\n"," # method=\"post\",\n"," # lro_return_json=True,\n"," # status_codes=None,\n"," #)\n"," from notebookutils import credentials\n","\n"," data = {\"mergePolicy\":2}\n"," # Get the Fabric token\n"," token = credentials.getToken(\"pbi\")\n","\n"," # Print the token\n"," #print(token)\n"," access_token = token\n"," headers = {'content-type': 'application/json', 'authorization': f'Bearer {access_token}'}\n"," response = requests.post(url, headers=headers, json=data)\n","\n"," print(\n"," f\"{icons.green_dot} The '{workspace_name}' workspace git connection has been initialized.\"\n"," )\n"," return response.json().get(\"partialSyncBaseCommit\")\n"," #return response_json.get(\"remoteCommitHash\")\n","\n","\n","def create_shortcuts_or_copy_data(p_copy_lakehouse_data,p_create_lakehouse_shortcuts):\n"," df_lhs = labs.list_lakehouses(source_ws)\n"," for index, row in df_lhs.iterrows():\n","\n","\n"," if copy_lakehouse_data:\n"," lh_name= row['Lakehouse Name']\n"," if lh_name.find('temp')==-1:\n"," # Gathers the list of tables and source paths to be copied into the target lakehouse \n"," src_path = f'abfss://{source_ws}@onelake.dfs.fabric.microsoft.com/{lh_name}.Lakehouse'\n","\n"," table_list = get_lh_object_list(src_path)\n"," print(f'Attempting to copy table data for lakehouse {lh_name} from workspace {source_ws} to {target_ws}...')\n"," display(table_list)\n","\n"," #print('Copy Lakehouse Delta tables...')\n"," res = copy_lh_objects(table_list[table_list['type']=='table'],source_ws,target_ws,\n"," lh_name,lh_name,False,False)\n"," display(res)\n"," # Copy files\n"," print(f'Attempting to copy file data for lakehouse {lh_name} from workspace {source_ws} to {target_ws}...')\n","\n"," #print('Copy Lakehouse files...')\n"," res = copy_lh_objects(table_list[table_list['type']=='file'],source_ws,target_ws,\n"," lh_name,lh_name,False,False)\n"," display(res)\n"," print('Done.')\n","\n"," else:\n"," # fetch ID of source lakehouse based on name and workspace\n"," source_lh_id = row['Lakehouse ID']\n"," target_lh_id = fabric.resolve_item_id(\n"," item_name=row['Lakehouse Name'], type=\"Lakehouse\", workspace=target_ws\n"," )\n","\n"," SOURCE_URI = f\"abfss://{source_ws_id}@onelake.dfs.fabric.microsoft.com/{source_lh_id}/Tables\"\n"," DEST_URI = f\"abfss://{target_ws_id}@onelake.dfs.fabric.microsoft.com/{target_lh_id}/Tables\"\n","\n"," if PATTERN_MATCH is None or len(PATTERN_MATCH) == 0:\n"," raise TypeError(\"Argument 'PATTERN_MATCH' should be a valid list of patterns or [\"*\"] to match everything\")\n","\n"," # Collect created shortcuts\n"," result = []\n","\n"," # If either URI's are invalid, just return\n"," if not is_valid_onelake_uri(SOURCE_URI) or not is_valid_onelake_uri(DEST_URI):\n"," print(\n"," \"invalid URI's provided. URI's should be in the form: abfss://@onelake.dfs.fabric.microsoft.com//\"\n"," )\n"," else:\n"," # Remove any trailing '/' from uri's\n"," source_uri_addr = SOURCE_URI.rstrip(\"/\")\n"," dest_uri_addr = DEST_URI.rstrip(\"/\")\n","\n"," dest_workspace_id, dest_item_id, dest_path = extract_onelake_https_uri_components(\n"," dest_uri_addr\n"," )\n","\n"," # If we are not shortcutting to a managed table folder or\n"," # the source uri is a delta table, just shortcut it 1-1.\n"," if not dest_path.startswith(\"Tables\") or is_delta_table(source_uri_addr):\n"," shortcut = create_onelake_shorcut(source_uri_addr, dest_uri_addr)\n"," if shortcut is not None:\n"," result.append(shortcut)\n"," else:\n"," # If source is not a delta table, and destination is managed table folder:\n"," # Iterate over source folders and create table shortcuts @ destination\n"," for delta_table_uri in get_matching_delta_tables_uris(\n"," source_uri_addr, PATTERN_MATCH\n"," ):\n"," shortcut = create_onelake_shorcut(delta_table_uri, dest_uri_addr)\n"," if shortcut is not None:\n"," result.append(shortcut)\n"," print(result)\n","\n","\n","########\n","### Pipeline utilities\n","########\n","\n","def update_data_pipeline_definition(\n"," name: str, pipeline_content: dict, workspace: Optional[str] = None\n","):\n"," \"\"\"\n"," Updates an existing data pipeline with a new definition.\n","\n"," Parameters\n"," ----------\n"," name : str\n"," The name of the data pipeline.\n"," pipeline_content : dict\n"," The data pipeline content (not in Base64 format).\n"," workspace : str, default=None\n"," The name of the workspace.\n"," Defaults to None which resolves to the workspace of the attached lakehouse\n"," or if no lakehouse attached, resolves to the workspace of the notebook.\n"," \"\"\"\n","\n"," (workspace, workspace_id) = resolve_workspace_name_and_id(workspace)\n"," client = fabric.FabricRestClient()\n"," pipeline_payload = base64.b64encode(json.dumps(pipeline_content).encode('utf-8')).decode('utf-8')\n"," pipeline_id = fabric.resolve_item_id(\n"," item_name=name, type=\"DataPipeline\", workspace=workspace\n"," )\n","\n"," request_body = {\n"," \"definition\": {\n"," \"parts\": [\n"," {\n"," \"path\": \"pipeline-content.json\",\n"," \"payload\": pipeline_payload,\n"," \"payloadType\": \"InlineBase64\"\n"," }\n"," ]\n"," }\n"," }\n","\n","\n"," response = client.post(\n"," f\"v1/workspaces/{workspace_id}/items/{pipeline_id}/updateDefinition\",\n"," json=request_body,\n"," )\n","\n"," lro(client, response, return_status_code=True)\n","\n"," print(\n"," f\"{icons.green_dot} The '{name}' pipeline was updated within the '{workspace}' workspace.\"\n"," )\n","\n","# Swaps the connection properties of an activity belonging to the specified item type(s)\n","def swap_pipeline_connection(pl_json: dict, p_source_ws: str,p_target_ws: str, \n"," p_item_type: List =['DataWarehouse','Lakehouse','Notebook'], \n"," p_conn_from_to: Optional[List[Tuple[str,str]]]=[]):\n"," \n"," source_ws_id = fabric.resolve_workspace_id(source_ws)\n","\n"," target_ws_id = fabric.resolve_workspace_id(target_ws)\n","\n"," if 'Warehouse' in p_item_type or 'Lakehouse' in p_item_type:\n"," ls_expr = parse('$..linkedService')\n"," for endpoint_match in ls_expr.find(pl_json):\n"," if endpoint_match.value['properties']['type'] == 'DataWarehouse' \\\n"," and endpoint_match.value['properties']['typeProperties']['workspaceId'] == source_ws_id \\\n"," and 'Warehouse' in p_item_type:\n"," # only update the warehouse if it was located in the source workspace i.e. we will update the properties to the target workspace if the warehouse resided in the same workspace as the pipeline\n"," warehouse_id = endpoint_match.value['properties']['typeProperties']['artifactId']\n"," warehouse_endpoint = endpoint_match.value['properties']['typeProperties']['endpoint']\n"," \n"," source_wh_name = fabric.resolve_item_name(item_id = warehouse_id,workspace=source_ws_id)\n"," # find the warehouse id of the warehouse with the same name in the target workspace\n"," target_wh_id = fabric.resolve_item_id(item_name = source_wh_name,type='Warehouse',workspace=target_ws_id)\n"," # look up the connection string for the warehouse in the target workspace\n"," whurl = f\"v1/workspaces/{target_ws_id}/warehouses/{target_wh_id}\"\n"," whresponse = client.get(whurl)\n"," lhconnStr = whresponse.json()['properties']['connectionString']\n"," endpoint_match.value['properties']['typeProperties']['artifactId'] = target_wh_id\n"," endpoint_match.value['properties']['typeProperties']['workspaceId'] = target_ws_id\n"," endpoint_match.value['properties']['typeProperties']['endpoint'] = lhconnStr\n"," ls_expr.update(endpoint_match,endpoint_match.value)\n"," if endpoint_match.value['properties']['type'] == 'Lakehouse' \\\n"," and endpoint_match.value['properties']['typeProperties']['workspaceId'] == source_ws_id \\\n"," and 'Lakehouse' in p_item_type:\n"," #print(endpoint_match.value)\n"," lakehouse_id = endpoint_match.value['properties']['typeProperties']['artifactId']\n"," remote_lh_name = fabric.resolve_item_name(item_id = lakehouse_id,workspace=source_ws_id)\n"," # find the lakehouse id of the lakehouse with the same name in the target workspace\n"," target_lh_id = fabric.resolve_item_id(item_name = remote_lh_name,type='Lakehouse',workspace=target_ws_id)\n"," endpoint_match.value['properties']['typeProperties']['artifactId'] = target_lh_id\n"," endpoint_match.value['properties']['typeProperties']['workspaceId'] = target_ws_id\n"," ls_expr.update(endpoint_match,endpoint_match.value)\n"," # print(endpoint_match.value)\n","\n","\n"," if 'Notebook' in p_item_type: \n"," ls_expr = parse('$..activities')\n","\n"," for endpoint_match in ls_expr.find(pl_json):\n"," for activity in endpoint_match.value:\n"," #print(activity['type'])\n"," if activity['type']=='TridentNotebook' and 'Notebook' in p_item_type: #only update if the notebook was in the same workspace as the pipeline\n"," print('change from '+activity['typeProperties']['workspaceId'])\n"," source_nb_id = activity['typeProperties']['notebookId']\n"," source_nb_name = fabric.resolve_item_name(item_id = source_nb_id,workspace=source_ws_id)\n"," target_nb_id = fabric.resolve_item_id(item_name = source_nb_name,type='Notebook',workspace=target_ws_id)\n"," activity['typeProperties']['notebookId']=target_nb_id\n"," activity['typeProperties']['workspaceId']=target_ws_id\n"," print('to notebook '+ target_nb_id)\n"," #ls_expr.update(endpoint_match,endpoint_match.value)\n","\n"," if p_conn_from_to:\n"," for ti_conn_from_to in p_conn_from_to:\n"," if not _is_valid_uuid(ti_conn_from_to[0]):\n"," print('Connection from is string '+ str(ti_conn_from_to[0]))\n"," dfC_filt = df_conns[df_conns[\"Connection Name\"] == ti_conn_from_to[0]] \n"," connId_from = dfC_filt['Connection Id'].iloc[0] \n"," else:\n"," connId_from = ti_conn_from_to[0]\n","\n"," if not _is_valid_uuid(ti_conn_from_to[1]):\n"," print('Connection from is string '+ str(ti_conn_from_to[1]))\n"," dfC_filt = df_conns[df_conns[\"Connection Name\"] == ti_conn_from_to[1]] \n"," connId_to = dfC_filt['Connection Id'].iloc[0] \n"," else:\n"," connId_to = ti_conn_from_to[1]\n","\n"," ls_expr = parse('$..externalReferences')\n"," for externalRef in ls_expr.find(pl_json):\n"," if externalRef.value['connection']==connId_from:\n"," print('Changing connection from '+str(connId_from))\n"," externalRef.value['connection']=connId_to\n"," ls_expr.update(externalRef,externalRef.value)\n"," print('to '+str(connId_to))\n","\n"," return pl_json\n","\n"],"outputs":[],"execution_count":null,"metadata":{"microsoft":{"language":"python","language_group":"synapse_pyspark"},"jupyter":{"source_hidden":true}},"id":"1ec311ff-c557-4e50-84d2-c873b789a5da"},{"cell_type":"markdown","source":["##### Standalone mode: Create AzDO branch and workspace\n","\n","This cell runs only if not invoked from AzDO pipeline"],"metadata":{"nteract":{"transient":{"deleting":false}},"microsoft":{"language":"python","language_group":"synapse_pyspark"}},"id":"0e1ff516-120f-4bc3-803c-5d558e336787"},{"cell_type":"code","source":["if _runStandalone:\n"," # get capacity details\n"," if target_capacity != '':\n"," if _is_valid_uuid(target_capacity):\n"," target_capacity_id = target_capacity.lower()\n"," else: \n"," target_capacity_id = labs.resolve_capacity_id(target_capacity)\n"," else: # if not set then fetch capacity of source workspace\n"," target_capacity_id = labs.resolve_workspace_capacity(source_ws)[0]\n"," target_capacity = labs.resolve_capacity_name(target_capacity_id)\n"," # check capacity status\n"," cap_status = get_capacity_status(target_capacity_id)\n"," if cap_status == 'Inactive':\n"," raise ValueError(f\"Status of capacity {target_capacity} is {cap_status}. Please resume the capacity and retry\")\n"," else:\n"," print(f\"Status of capacity {target_capacity} is {cap_status}\")\n","\n","\n","\n"," # Create Azure DevOps Branch\n"," ado_api_url = f\"{ado_api_url}/{org_name}/{project_name}/_apis/git/repositories/{repo_name}/refs\"\n"," result = create_azdo_branch(key_vault_name,secret_name, branch_name, main_branch,repo_name, ado_api_url)\n"," if result:\n"," print(f\"Feature branch {branch_name} created\")\n"," else:\n"," raise ValueError(f\"Could not create branch {branch_name}, perhaps it already exists?\")\n","\n"," # Create new feature workspace\n"," try:\n"," print(f\"Creating workspace: \" + target_ws + \" in capacity \"+ target_capacity_id +\"...\",end=\"\")\n"," target_ws_id = fabric.create_workspace(target_ws,target_capacity_id) \n"," print(f\"done. New workspace ID = {target_ws_id = }\")\n"," except Exception as error:\n"," errmsg = f\"Failed to create workspace {target_ws} with capacity ID ({target_capacity_id}) due to: {str(error)}\"\n"," raise ValueError(errmsg)\n"],"outputs":[],"execution_count":null,"metadata":{"microsoft":{"language":"python","language_group":"synapse_pyspark"},"collapsed":false},"id":"6d334824-7c65-4a6f-9fb0-61267403aa36"},{"cell_type":"markdown","source":["##### If required, create Lakehouses in new feature workspace\n","\n","This cell will only run if there are warehouse views which depend on lakehouse tables. Determined by the parameter has_wh_views_on_lh set to True above.\n","
In this case the lakehouses will be pre-created. Shortcuts created or data copied done in subsequent cell"],"metadata":{"nteract":{"transient":{"deleting":false}},"microsoft":{"language":"python","language_group":"synapse_pyspark"}},"id":"2d515be2-cc9f-4a09-9c10-f2ad12485c23"},{"cell_type":"code","source":["if has_wh_views_on_lh:\n"," lhs = fabric.list_items(type='Lakehouse',workspace=source_ws)\n"," for index,row in fabric.list_items(type='Lakehouse',workspace=source_ws).iterrows():\n"," #try:\n"," print(f\"Creating {row['Display Name']} lakehouse...\",end=\"\")\n"," fabric.create_lakehouse(display_name = row['Display Name'],description='Created programatically via branch out script',max_attempts=3, workspace=target_ws)\n"," print('done')"],"outputs":[],"execution_count":null,"metadata":{"microsoft":{"language":"python","language_group":"synapse_pyspark"}},"id":"53dbce3a-a318-400c-acca-e3a8e9fd19ae"},{"cell_type":"markdown","source":["##### Either create shortcuts from source to target lakehouse(s) or copy data\n","\n","Loops through lakehouse(s) in the target workspace and either populates them with shortcuts or data\n","\n"],"metadata":{"nteract":{"transient":{"deleting":false}},"microsoft":{"language":"python","language_group":"synapse_pyspark"}},"id":"a15065f3-670d-4bc9-b337-51709f6cdb1f"},{"cell_type":"code","source":["# ensure target workspace ID is set based on name\n","if target_ws_id == '':\n"," target_ws_id = fabric.resolve_workspace_id(target_ws)\n","\n","# populate the lakehouses with shortcuts or data\n","create_shortcuts_or_copy_data(copy_lakehouse_data,create_lakehouse_shortcuts)\n","time.sleep(120) # wait for sql endpoint to update\n"],"outputs":[],"execution_count":null,"metadata":{"jupyter":{"source_hidden":false,"outputs_hidden":false},"nteract":{"transient":{"deleting":false}},"microsoft":{"language":"python","language_group":"synapse_pyspark"},"collapsed":false},"id":"8bec3fbc-4a75-4ba3-86d5-0620ec504a8f"},{"cell_type":"markdown","source":["##### If not done in ADO, connect workspace to Git, initialize and update\n","This cell will only run if there are warehouse views which depend on lakehouse tables based on parameter has_wh_views_on_lh"],"metadata":{"nteract":{"transient":{"deleting":false}},"microsoft":{"language":"python","language_group":"synapse_pyspark"}},"id":"c7dd8a46-f6bd-4a84-90f0-e176357ced17"},{"cell_type":"code","source":["if has_wh_views_on_lh:\n","\n"," # branch name should be the same name as the target workspace, if not modify parameters to pass in the branch name from ADO\n"," branch_name = target_ws \n"," # fetch existing Git connection details from source workspace\n"," gitconnx = labs.get_git_connection(source_ws)\n"," # connect target workspace to the same git provider and new branch\n"," if gitconnx.loc[0, 'Git Provider Type'] == 'AzureDevOps':\n"," result = labs.connect_workspace_to_azure_dev_ops(\n"," gitconnx.loc[0, 'Organization Name'],\n"," gitconnx.loc[0, 'Project Name'],\n"," gitconnx.loc[0, 'Repository Name'],\n"," branch_name,\n"," gitconnx.loc[0, 'Directory Name'],\n"," target_ws)\n"," elif gitconnx.loc[0, 'Git Provider Type'] == 'GitHub': #### TODO ####\n"," #TODO connect workspace with Github details\n"," labs.connect_workspace_to_github(\n"," '', #owner_name: str,\n"," gitconnx.loc[0, 'Repository Name'], #repository_name: str,\n"," branch_name,\n"," gitconnx.loc[0, 'Directory Name'], #directory_name: str,\n"," '', #connection_id: UUID,\n"," '', #source: str = \"ConfiguredConnection\",\n"," target_ws)\n"," else:\n"," raise ValueError(f\"Unsupported Git provider type {gitconnx.loc[0, 'Git Provider Type']}.\")"],"outputs":[],"execution_count":null,"metadata":{"microsoft":{"language":"python","language_group":"synapse_pyspark"},"collapsed":false},"id":"93fe3d56-d870-433f-bdea-1e50ca58440b"},{"cell_type":"code","source":["if has_wh_views_on_lh:\n"," # inialize the git connection\n"," commit_hash = initialize_git_connection(target_ws,'PreferRemote')\n","\n","\n"," # update from git \n"," labs.update_from_git(remote_commit_hash=commit_hash,\n"," conflict_resolution_policy='PreferRemote', \n"," allow_override=True,\n"," workspace=target_ws)\n","\n"],"outputs":[],"execution_count":null,"metadata":{"microsoft":{"language":"python","language_group":"synapse_pyspark"}},"id":"b0061d77-6213-4eeb-a55f-5a870077628c"},{"cell_type":"code","source":["labs.commit_to_git(comment='Initial', workspace=target_ws)"],"outputs":[],"execution_count":null,"metadata":{"microsoft":{"language":"python","language_group":"synapse_pyspark"}},"id":"89d2fa99-1073-44a9-94cc-74e74abf68b4"},{"cell_type":"markdown","source":["##### Update default and attached lakehouses/warehouses for notebooks\n","\n","Update notebook dependencies based on but now supports T-SQL notebooks:\n","https://github.com/PowerBiDevCamp/FabConWorkshopSweden/blob/main/DemoFiles/GitUpdateWorkspace/updateWorkspaceDependencies_v1.ipynb\n"],"metadata":{"nteract":{"transient":{"deleting":false}},"microsoft":{"language":"python","language_group":"synapse_pyspark"}},"id":"aaae8a08-588d-4dd8-9d2c-2200b7a88d30"},{"cell_type":"code","source":["for notebook in notebookutils.notebook.list(workspaceId=target_ws_id):\n"," updates = False\n"," if notebook.displayName == 'ETL':#True: #notebook.displayName == 'T-SQL_Notebook': #notebook.displayName != 'Create Feature Branch':\n","\n"," # Get the current notebook definition\n"," json_payload = json.loads(notebookutils.notebook.getDefinition(notebook.displayName,workspaceId=source_ws_id))\n"," #print(json.dumps(json_payload, indent=4))\n"," # Check for any attached lakehouses\n"," if 'dependencies' in json_payload['metadata'] \\\n"," and 'lakehouse' in json_payload['metadata']['dependencies'] \\\n"," and json_payload['metadata'][\"dependencies\"][\"lakehouse\"] is not None:\n"," # Extract attached and default lakehouses\n"," current_lakehouse = json_payload['metadata']['dependencies']['lakehouse']\n"," # if default lakehouse setting exists\n"," if 'default_lakehouse_name' in current_lakehouse:\n"," print(f\"Updating notebook {notebook.displayName} with new default lakehouse: {current_lakehouse['default_lakehouse_name']} in workspace {target_ws}\")\n"," source_lh_name = fabric.resolve_item_name(item_id = current_lakehouse['default_lakehouse'],type='Lakehouse',workspace=source_ws_id)\n"," current_lakehouse['default_lakehouse'] = fabric.resolve_item_id(item_name = source_lh_name,type='Lakehouse',workspace=target_ws_id)\n"," current_lakehouse['default_lakehouse_workspace_id'] = target_ws_id\n"," updates = True\n"," # loop through all attached lakehouess\n"," for lakehouse in json_payload['metadata']['dependencies']['lakehouse']['known_lakehouses']:\n"," source_lh_id = lakehouse['id']\n"," # find source lakehouse name\n"," source_lh_name = fabric.resolve_item_name(item_id = lakehouse['id'],type='Lakehouse',workspace=source_ws_id)\n"," # find target lakehouse id based on name\n"," target_lh_id = fabric.resolve_item_id(item_name = source_lh_name,type='Lakehouse',workspace=target_ws_id)\n"," lakehouse['id'] = target_lh_id\n"," print(f'Updating attached lakehouse {source_lh_name} from {source_lh_id} to target ID {target_lh_id}')\n"," updates = True\n","\n"," if 'dependencies' in json_payload['metadata'] and 'warehouse' in json_payload['metadata']['dependencies']:\n"," # Fetch existing details\n"," current_warehouse = json_payload['metadata']['dependencies']['warehouse']\n"," current_warehouse_id = current_warehouse['default_warehouse']\n"," source_wh_name = fabric.resolve_item_name(item_id = current_warehouse_id,workspace=source_ws_id)\n"," #print('Source warehouse name is ' + source_wh_name)\n"," target_wh_id = fabric.resolve_item_id(item_name = source_wh_name,type='Warehouse',workspace=target_ws_id)\n","\n"," if 'default_warehouse' in current_warehouse:\n"," #json_payload['metadata']['dependencies']['warehouse'] = {}\n"," print(f\"Attempting to update notebook {notebook.displayName} with new default warehouse: {target_wh_id} in {target_ws}\")\n"," \n"," json_payload['metadata']['dependencies']['warehouse']['default_warehouse'] = target_wh_id\n"," for warehouse in json_payload['metadata']['dependencies']['warehouse']['known_warehouses']:\n"," if warehouse['id'] == current_warehouse_id:\n"," warehouse['id'] = target_wh_id\n"," updates = True\n","\n"," if updates:\n"," notebookutils.notebook.updateDefinition(\n"," name = notebook.displayName,\n"," content = json.dumps(json_payload),\n"," workspaceId = target_ws_id\n"," )\n"," \n"," print(f\"Updated notebook {notebook.displayName} in {target_ws}\")\n","\n"," else:\n"," print(f'No default lakehouse set for notebook {notebook.displayName}, ignoring.')"],"outputs":[],"execution_count":null,"metadata":{"jupyter":{"source_hidden":false,"outputs_hidden":false},"nteract":{"transient":{"deleting":false}},"microsoft":{"language":"python","language_group":"synapse_pyspark"}},"id":"5c60b5d2-f83c-46f8-9870-9fd609166b67"},{"cell_type":"markdown","source":["##### Copy warehouse data via parameterised pipeline\n","\n","Loop through all warehouses and copy the data"],"metadata":{"nteract":{"transient":{"deleting":false}},"microsoft":{"language":"python","language_group":"synapse_pyspark"}},"id":"f198b816-77e9-4f04-9139-d78237bedc72"},{"cell_type":"code","source":["if copy_warehouse_data:\n"," p_logging_verbose = False\n"," df_warehouses = (labs.list_warehouses(target_ws))\n"," #display(df_warehouses)\n"," for index, row in df_warehouses.iterrows():\n"," source_wh_id = labs.resolve_warehouse_id(row['Warehouse Name'],source_ws_id)\n"," target_wh_id = labs.resolve_warehouse_id(row['Warehouse Name'],target_ws_id)\n"," \n"," src_path = f'abfss://'+source_ws_id+'@onelake.dfs.fabric.microsoft.com/'+source_wh_id\n"," tgt_path = f'abfss://'+target_ws_id+'@onelake.dfs.fabric.microsoft.com/'+target_wh_id\n","\n"," # extract the list of schemas per data \n"," schema_list = get_lh_object_list(src_path,['Tables'])\n"," # extract a list of warehouse objects per schema and store in a list\n"," table_list = get_wh_object_list(schema_list['name'],src_path)\n"," \n"," # create a temporary staging lakehouse per warehouse to create shortcuts into, \n"," # which point back to original warehouse data currently in the DR storage account\n"," lhname = 'temp_rlh_' + source_ws+'_'+row['Warehouse Name']\n"," # check if it exists before attempting create\n"," if p_logging_verbose:\n"," print('Checking whether the temporary lakehouse \"'+ lhname +'\" exists in workspace '+target_ws+'...')\n"," temp_lh_id = getItemId(target_ws_id,lhname,'Lakehouse')\n"," if temp_lh_id == 'NotExists':\n"," lhname = lhname[:256] # lakehouse name should not exceed 256 characters\n"," payload = payload = '{\"displayName\": \"' + lhname + '\",' \\\n"," + '\"description\": \"Interim staging lakehouse for primary warehouse recovery: ' \\\n"," + source_ws+'_'+row['Warehouse Name'] + 'into workspace '+ target_ws + '(' + target_ws +')\"}'\n"," try:\n"," lhurl = \"v1/workspaces/\" + target_ws_id + \"/lakehouses\"\n"," lhresponse = client.post(lhurl,json= json.loads(payload))\n"," temp_lh_id = lhresponse.json()['id']\n"," if p_logging_verbose:\n"," print('Temporary lakehouse \"'+ lhname +'\" created with Id ' + temp_lh_id + ': ' + str(lhresponse.status_code) + ' ' + str(lhresponse.text))\n"," except Exception as error:\n"," print(error.errorCode)\n"," else:\n"," if p_logging_verbose:\n"," print('Temporary lakehouse '+lhname+' (' + temp_lh_id + ') already exists.')\n"," \n"," time.sleep(60) # waiting for temporary lakehouse to provision completely \n","\n"," # Create shortcuts for every table in the format of schema_table under the tables folder\n"," for index,itable in table_list.iterrows():\n"," shortcutExists=False\n"," # Check if shortcut exists\n"," try:\n"," url = \"v1/workspaces/\" + target_ws_id + \"/items/\" + temp_lh_id + \"/shortcuts/Tables/\"+itable['schema']+'_'+itable['name']\n"," tlhresponse = client.get(url)\n"," shortcutExists = True\n"," if p_logging_verbose:\n"," print('Shortcut '+itable['schema']+'_'+itable['name'] +' already exists')\n"," except Exception as error:\n"," shortcutExists = False \n","\n"," if not shortcutExists: \n"," # Create shortcuts - one per table per schema\n"," url = \"v1/workspaces/\" + target_ws_id + \"/items/\" + temp_lh_id + \"/shortcuts\"\n"," scpayload = '{' \\\n"," '\"path\": \"Tables/\",' \\\n"," '\"name\": \"'+itable['schema']+'_'+itable['name']+'\",' \\\n"," '\"target\": {' \\\n"," '\"oneLake\": {' \\\n"," '\"workspaceId\": \"' + source_ws_id + '\",' \\\n"," '\"itemId\": \"'+ source_wh_id +'\",' \\\n"," '\"path\": \"/Tables/' + itable['schema']+'/'+itable['name'] + '\"' \\\n"," '}}}' \n"," try:\n"," #print(scpayload) \n"," shctresponse = client.post(url,json= json.loads(scpayload))\n"," if p_logging_verbose:\n"," print('Shortcut '+itable['schema']+'_'+itable['name'] + ' created.' )\n","\n"," except Exception as error:\n"," print('Error creating shortcut '+itable['schema']+'_'+itable['name']+' due to '+str(error) + ':' + shctresponse.text)\n"," \n"," recovery_pipeline_prefix= 'plRecover_WH' \n"," # recovery pipeline name should not exceed 256 characters\n"," recovery_pipeline = recovery_pipeline_prefix+'_'+source_ws + '_'+row['Warehouse Name'][:256]\n"," if p_logging_verbose:\n"," print('Attempting to deploy a copy pipeline in the target workspace to load the target warehouse tables from the shortcuts created above... ')\n"," # Create the pipeline in the target workspace that loads the target warehouse from shortcuts created above \n"," plid = getItemId( target_ws_id,recovery_pipeline,'DataPipeline')\n"," #print(plid)\n"," if plid == 'NotExists':\n"," plid = createDWrecoverypl(target_ws_id,recovery_pipeline_prefix+'_'+source_ws + '_'+row['Warehouse Name'])\n"," if p_logging_verbose:\n"," print('Recovery pipeline ' + recovery_pipeline + ' created with Id '+plid)\n"," else:\n"," if p_logging_verbose:\n"," print('Datawarehouse recovery pipeline \"' + recovery_pipeline + '\" ('+plid+') already exist in workspace \"'+target_ws + '\" ('+target_ws_id+')') \n"," print('\\n')\n","\n"," tablesToCopyParam = table_list[['schema','name']].to_json( orient='records')\n"," # ensure the temporary lakehouse exists\n","\n"," # obtain the connection string for the lakehouse to pass to the copy pipeline\n"," whurl = \"v1/workspaces/\" + target_ws_id + \"/lakehouses/\" + temp_lh_id\n"," whresponse = client.get(whurl)\n"," lhconnStr = whresponse.json()['properties']['sqlEndpointProperties']['connectionString']\n","\n"," # get the SQLEndpoint ID of the lakehouse to pass to the copy pipeline\n"," items = fabric.list_items(workspace=target_ws_id)\n"," #print(items)\n"," temp_lh_sqle_id = items[(items['Type'] == 'SQLEndpoint') & (items['Display Name']==lhname)]['Id'].values[0]\n","\n","\n"," # obtain the connection string for the warehouse to pass to the copy pipeline \n"," whurl = \"v1/workspaces/\" + target_ws_id + \"/warehouses/\" + target_wh_id\n"," whresponse = client.get(whurl)\n"," whconnStr = whresponse.json()['properties']['connectionInfo']\n","\n"," # obtain the pipeline id created to recover this warehouse\n"," plid = getItemId( target_ws_id,recovery_pipeline,'DataPipeline')\n"," if plid == 'NotExists':\n"," print('Error: Could not execute pipeline '+recovery_pipeline+ ' as the ID could not be obtained ')\n"," else:\n"," # pipeline url including pipeline Id unique to each warehouse\n"," plurl = 'v1/workspaces/'+target_ws_id+'/items/'+plid+'/jobs/instances?jobType=Pipeline'\n"," #print(plurl)\n","\n"," payload_data = '{' \\\n"," '\"executionData\": {' \\\n"," '\"parameters\": {' \\\n"," '\"lakehouseId\": \"' + temp_lh_sqle_id + '\",' \\\n"," '\"tablesToCopy\": ' + tablesToCopyParam + ',' \\\n"," '\"workspaceId\": \"' + target_ws_id +'\",' \\\n"," '\"warehouseId\": \"' + target_wh_id + '\",' \\\n"," '\"lakehouseConnStr\": \"' + lhconnStr + '\",' \\\n"," '\"warehouseConnStr\": \"' + whconnStr + '\"' \\\n"," '}}}'\n"," #print(payload_data)\n"," plresponse = client.post(plurl, json=json.loads(payload_data))\n"," if p_logging_verbose:\n"," print(str(plresponse.status_code)) \n"," print('Done')\n"],"outputs":[],"execution_count":null,"metadata":{"jupyter":{"source_hidden":true,"outputs_hidden":false},"nteract":{"transient":{"deleting":false}},"microsoft":{"language":"python","language_group":"synapse_pyspark"},"collapsed":false},"id":"57dafef7-17a2-475f-9e62-eecc6660440c"},{"cell_type":"markdown","source":["##### Update directlake model lakehouse/warehouse connection\n","\n","https://semantic-link-labs.readthedocs.io/en/stable/sempy_labs.directlake.html#sempy_labs.directlake.update_direct_lake_model_connection "],"metadata":{"nteract":{"transient":{"deleting":false}},"microsoft":{"language":"python","language_group":"synapse_pyspark"}},"id":"cc97be77-116e-4cde-bdc6-2971ab98a083"},{"cell_type":"code","source":["\n","df_datasets = fabric.list_datasets(target_ws)\n","\n","# Iterate over each dataset in the dataframe\n","for index, row in df_datasets.iterrows():\n"," try:\n"," # Check if the dataset is not the default semantic model\n"," if not labs.is_default_semantic_model(row['Dataset Name'], fabric.resolve_workspace_id(target_ws)):\n"," #print('Updating semantic model connection ' + row['Dataset Name'] + ' in workspace '+ target_ws)\n"," labs.directlake.update_direct_lake_model_connection(dataset=row['Dataset Name'], \n"," workspace= target_ws,\n"," source=labs.directlake.get_direct_lake_source(row['Dataset Name'], workspace= target_ws)[1], \n"," source_type=labs.directlake.get_direct_lake_source(row['Dataset Name'], workspace= target_ws)[0], \n"," source_workspace=target_ws)\n"," labs.refresh_semantic_model(dataset=row['Dataset Name'], workspace= target_ws)\n"," except Exception as error:\n"," errmsg = f\"Failed to update and refresh semantic model {row['Dataset Name']} due to: {str(error)}\"\n"," print(errmsg)\n"," #raise ValueError(errmsg)\n"],"outputs":[],"execution_count":null,"metadata":{"jupyter":{"source_hidden":false,"outputs_hidden":false},"nteract":{"transient":{"deleting":false}},"microsoft":{"language":"python","language_group":"synapse_pyspark"}},"id":"9deccda6-5c3d-4b88-8ed8-68855ca0949a"},{"cell_type":"markdown","source":["##### Rebind reports to local datasets\n","\n","https://semantic-link-labs.readthedocs.io/en/latest/sempy_labs.report.html#sempy_labs.report.report_rebind"],"metadata":{"nteract":{"transient":{"deleting":false}},"microsoft":{"language":"python","language_group":"synapse_pyspark"}},"id":"36783f3b-4904-4d74-842d-dbd026a3184a"},{"cell_type":"code","source":["df_reports = fabric.list_reports(workspace=target_ws)\n","for index, row in df_reports.iterrows():\n"," #print(row['Name'] + '-' + row['Dataset Id'])\n"," df_datasets = fabric.list_datasets(workspace=target_ws)\n"," dataset_name = df_datasets[df_datasets['Dataset ID'] == row['Dataset Id']]['Dataset Name'].values[0]\n"," print(f'Rebinding report to {dataset_name} in {target_ws}')\n"," labs.report.report_rebind(report=row['Name'],dataset=dataset_name, report_workspace=target_ws, dataset_workspace=target_ws)\n"],"outputs":[],"execution_count":null,"metadata":{"jupyter":{"source_hidden":false,"outputs_hidden":false},"nteract":{"transient":{"deleting":false}},"microsoft":{"language":"python","language_group":"synapse_pyspark"},"collapsed":false},"id":"06268ede-b795-493e-9a8d-772654ce7e20"},{"cell_type":"markdown","source":["##### Update data pipeline source & sink connections\n","\n","Support changes lakehouses, warehouses, notebooks and connections from source to target.
\n","Connections changes should be expressed as an array of tuples [{from_1:to_1},{from_N:to_N}]"],"metadata":{"nteract":{"transient":{"deleting":false}},"microsoft":{"language":"python","language_group":"synapse_pyspark"}},"id":"4ae65012-350c-40c0-a68a-4069c567a85f"},{"cell_type":"code","source":["if len(connections_from_to)>0: \n"," # convert from a string to a proper type i.e. list of tuples \n"," #connections_from_to = ast.literal_eval(connections_from_to)\n"," # loading a dataframe of connections to perform an ID lookup if required \n"," df_conns = labs.list_connections()\n","\n"," df_pipeline = labs.list_data_pipelines(source_ws)\n"," for index, row in df_pipeline.iterrows():\n"," pipeline_json = json.loads(labs.get_data_pipeline_definition(row['Data Pipeline Name'],source_ws))\n","\n"," p_new_json = swap_pipeline_connection(pipeline_json, source_ws,target_ws,\n"," ['DataWarehouse','Lakehouse','Notebook'],\n"," [connections_from_to]) \n"," #print(json.dumps(pipeline_json, indent=4))\n"," \n"," update_data_pipeline_definition(name=row['Data Pipeline Name'],pipeline_content=pipeline_json, workspace=target_ws)\n"],"outputs":[],"execution_count":null,"metadata":{"jupyter":{"source_hidden":false,"outputs_hidden":false},"nteract":{"transient":{"deleting":false}},"microsoft":{"language":"python","language_group":"synapse_pyspark"}},"id":"079958e8-2880-484a-a994-41caf47e747e"},{"cell_type":"markdown","source":["##### Commit changes made above to Git"],"metadata":{"nteract":{"transient":{"deleting":false}},"microsoft":{"language":"python","language_group":"synapse_pyspark"}},"id":"44174276-b983-4e80-9451-0afb9589cf1f"},{"cell_type":"code","source":["labs.commit_to_git(comment='Initial', workspace=target_ws)"],"outputs":[],"execution_count":null,"metadata":{"jupyter":{"source_hidden":false,"outputs_hidden":false},"nteract":{"transient":{"deleting":false}},"microsoft":{"language":"python","language_group":"synapse_pyspark"}},"id":"9a5c3d84-f71d-4348-b419-c4953ac9e1d0"}],"metadata":{"kernel_info":{"name":"synapse_pyspark"},"kernelspec":{"name":"synapse_pyspark","language":"Python","display_name":"Synapse PySpark"},"language_info":{"name":"python"},"microsoft":{"language":"python","language_group":"synapse_pyspark","ms_spell_check":{"ms_spell_check_language":"en"}},"widgets":{"application/vnd.jupyter.widget-state+json":{"version_major":2,"version_minor":0,"state":{"0bf5b65883d845b8a785f718f257f0fe":{"model_name":"LayoutModel","model_module":"@jupyter-widgets/base","model_module_version":"2.0.0","state":{}},"34636ca1188542389c57e315b907be79":{"model_name":"LayoutModel","model_module":"@jupyter-widgets/base","model_module_version":"2.0.0","state":{}},"729255d539d44c0da98b3f6e1c79f301":{"model_name":"LabelModel","model_module":"@jupyter-widgets/controls","model_module_version":"2.0.0","state":{"value":"Initial text","layout":"IPY_MODEL_34b4c41de9ad476088e9e30f193fd2d3","style":"IPY_MODEL_57e49b7587e9459f9f53fc6e48e1ed70"}},"c70d5bf879c243a984ade0d098ab4ff5":{"model_name":"LabelStyleModel","model_module":"@jupyter-widgets/controls","model_module_version":"2.0.0","state":{"description_width":"","font_size":null,"text_color":null,"font_family":null,"font_style":null,"font_variant":null,"font_weight":null,"text_decoration":null}},"4cd8bbf291024e7bb03f1fa73d110710":{"model_name":"LayoutModel","model_module":"@jupyter-widgets/base","model_module_version":"2.0.0","state":{}},"41f8d04c976245918a4be0806750173c":{"model_name":"LayoutModel","model_module":"@jupyter-widgets/base","model_module_version":"2.0.0","state":{}},"2529422048414fcabaaa68abefbd8d68":{"model_name":"LabelStyleModel","model_module":"@jupyter-widgets/controls","model_module_version":"2.0.0","state":{"description_width":"","font_size":null,"text_color":null,"font_family":null,"font_style":null,"font_variant":null,"font_weight":null,"text_decoration":null}},"bbd0570346bf4891a6740d5c472f557d":{"model_name":"LabelModel","model_module":"@jupyter-widgets/controls","model_module_version":"2.0.0","state":{"value":"⌛ Loading","layout":"IPY_MODEL_41f8d04c976245918a4be0806750173c","style":"IPY_MODEL_d464a36f82f14b8d92cd20b8b77b2d4c"}},"1130fc45eb4f47b1bf8a05201004a829":{"model_name":"LabelModel","model_module":"@jupyter-widgets/controls","model_module_version":"2.0.0","state":{"value":"Initial text","layout":"IPY_MODEL_0bf5b65883d845b8a785f718f257f0fe","style":"IPY_MODEL_477e0a21d0ee429d847afc4f29265fa7"}},"1a9e3ec3c78245a58d7da90f00663d0f":{"model_name":"LabelModel","model_module":"@jupyter-widgets/controls","model_module_version":"2.0.0","state":{"value":"Initial text","layout":"IPY_MODEL_34636ca1188542389c57e315b907be79","style":"IPY_MODEL_d13c5bb179b04d789caf440915ad4ab4"}},"638b47eed64847fba74e81e558f85aef":{"model_name":"LabelModel","model_module":"@jupyter-widgets/controls","model_module_version":"2.0.0","state":{"value":"⌛ Loading","layout":"IPY_MODEL_4cd8bbf291024e7bb03f1fa73d110710","style":"IPY_MODEL_c70d5bf879c243a984ade0d098ab4ff5"}},"748fa550c8c34bf68caf6fb5480c2bcc":{"model_name":"LabelModel","model_module":"@jupyter-widgets/controls","model_module_version":"2.0.0","state":{"value":"⌛ Loading","layout":"IPY_MODEL_31877256e23b46fb9fb9619833d4a58a","style":"IPY_MODEL_2529422048414fcabaaa68abefbd8d68"}},"d13c5bb179b04d789caf440915ad4ab4":{"model_name":"LabelStyleModel","model_module":"@jupyter-widgets/controls","model_module_version":"2.0.0","state":{"description_width":"","font_size":null,"text_color":null,"font_family":null,"font_style":null,"font_variant":null,"font_weight":null,"text_decoration":null}},"57e49b7587e9459f9f53fc6e48e1ed70":{"model_name":"LabelStyleModel","model_module":"@jupyter-widgets/controls","model_module_version":"2.0.0","state":{"description_width":"","font_size":null,"text_color":null,"font_family":null,"font_style":null,"font_variant":null,"font_weight":null,"text_decoration":null}},"34b4c41de9ad476088e9e30f193fd2d3":{"model_name":"LayoutModel","model_module":"@jupyter-widgets/base","model_module_version":"2.0.0","state":{}},"477e0a21d0ee429d847afc4f29265fa7":{"model_name":"LabelStyleModel","model_module":"@jupyter-widgets/controls","model_module_version":"2.0.0","state":{"description_width":"","font_size":null,"text_color":null,"font_family":null,"font_style":null,"font_variant":null,"font_weight":null,"text_decoration":null}},"d464a36f82f14b8d92cd20b8b77b2d4c":{"model_name":"LabelStyleModel","model_module":"@jupyter-widgets/controls","model_module_version":"2.0.0","state":{"description_width":"","font_size":null,"text_color":null,"font_family":null,"font_style":null,"font_variant":null,"font_weight":null,"text_decoration":null}},"31877256e23b46fb9fb9619833d4a58a":{"model_name":"LayoutModel","model_module":"@jupyter-widgets/base","model_module_version":"2.0.0","state":{}}}}},"nteract":{"version":"nteract-front-end@1.0.0"},"synapse_widget":{"version":"0.1","state":{}},"spark_compute":{"compute_id":"/trident/default","session_options":{"conf":{"spark.synapse.nbs.session.timeout":"1200000"}}},"dependencies":{"lakehouse":{}}},"nbformat":4,"nbformat_minor":5}
\ No newline at end of file
diff --git a/accelerators/CICD/Branch-out-to-new-workspace/README.md b/accelerators/CICD/Branch-out-to-new-workspace/README.md
index f3d44ae..064824e 100644
--- a/accelerators/CICD/Branch-out-to-new-workspace/README.md
+++ b/accelerators/CICD/Branch-out-to-new-workspace/README.md
@@ -157,7 +157,7 @@ authentication method has been chosen:
11. This will display run-time parameters which can be modified as necessary:
-
+
a. Source workspace: Name of the dev workspace
b. Target workspace: Name of the new workspace to be created which will also serve as the new branch name
@@ -167,8 +167,9 @@ a. Source workspace: Name of the dev workspace
f. Swap connections in pipelines: Specify connections to be replaced using format (from 1 ,to1),(from2,to2),...(fromN,toN) using either connection ID or name.
g. Enter Developer Email: Add the email address of the developer to be granted admin role on the new workspace
h. Enter Capacity ID: Enter the capacity ID of the new workspace if different from the default GUID.
-
i. Enter the branch name: Enter the source branch name which the new branch will be created from.
-
j. Click Run and monitor the release pipeline in Azure Devops and also the progress of the post activity notebook in the Fabric monitoring hub.
+
i. Enter the source branch name: Enter the source branch name which the new branch will be created from.
+
j. Enter the Git folder where Fabric content is stored. Leave as / if content is stored in root.
+
k. Click Run and monitor the release pipeline in Azure Devops and also the progress of the post activity notebook in the Fabric monitoring hub.
12. To debug and monitor the running YAML pipeline click on the “BranchOut” job
diff --git a/accelerators/media/pipeline_params.png b/accelerators/media/pipeline_params.png
index 1d52d41..c111b87 100644
Binary files a/accelerators/media/pipeline_params.png and b/accelerators/media/pipeline_params.png differ