diff --git a/community templates/DataverseToAzureSQLDB/Sync Dataverse to Azure SQL DB.json b/community templates/DataverseToAzureSQLDB/Sync Dataverse to Azure SQL DB.json new file mode 100644 index 00000000..c64fca81 --- /dev/null +++ b/community templates/DataverseToAzureSQLDB/Sync Dataverse to Azure SQL DB.json @@ -0,0 +1 @@ +{"$schema":"http://schema.management.azure.com/schemas/2015-01-01/deploymentTemplate.json#","contentVersion":"1.0.0.0","parameters":{"workspaceName":{"type":"string","metadata":"Workspace name","defaultValue":"sawstx"},"DestinationSQLDB":{"type":"string"},"SourceSynapseServerlessDB":{"type":"string"}},"variables":{"workspaceId":"[concat('Microsoft.Synapse/workspaces/', parameters('workspaceName'))]"},"resources":[{"name":"[concat(parameters('workspaceName'), '/Sync Orchestration')]","type":"Microsoft.Synapse/workspaces/pipelines","apiVersion":"2019-06-01-preview","properties":{"description":"Main pipeline that orchestrates the synchronization process and calls other pipelines as needed.","activities":[{"name":"Get tables to process","description":"Get a list of tables that need to be synchronized from a control table.","type":"Lookup","dependsOn":[{"activity":"Auto-Populate List of Tables to Sync","dependencyConditions":["Succeeded"]}],"policy":{"timeout":"7.00:00:00","retry":0,"retryIntervalInSeconds":30,"secureOutput":false,"secureInput":false},"userProperties":[],"typeProperties":{"source":{"type":"AzureSqlSource","sqlReaderStoredProcedureName":"[[orchestration].[GetTablesToProcess]","storedProcedureParameters":{"TableGroup":{"type":"Int16","value":{"value":"@pipeline().parameters.TableGroupToSync","type":"Expression"}}},"queryTimeout":"02:00:00","partitionOption":"None"},"dataset":{"referenceName":"DataverseSQLDB","type":"DatasetReference","parameters":{"SchemaName":"n/a","TableName":"n/a"}},"firstRowOnly":false}},{"name":"For each table","description":"Determine whether to perform a full or an incremental copy and execute an appropriate child pipeline.","type":"ForEach","dependsOn":[{"activity":"Get tables to process","dependencyConditions":["Succeeded"]}],"userProperties":[],"typeProperties":{"items":{"value":"@activity('Get tables to process').output.value","type":"Expression"},"isSequential":false,"activities":[{"name":"Separate Full Copy from Incremental Copy Tables","type":"IfCondition","dependsOn":[{"activity":"Execute Schema Drift Handler","dependencyConditions":["Succeeded"]}],"userProperties":[],"typeProperties":{"expression":{"value":"@item().IsIncremental","type":"Expression"},"ifFalseActivities":[{"name":"Execute Full Copy Pipeline","type":"ExecutePipeline","dependsOn":[],"userProperties":[],"typeProperties":{"pipeline":{"referenceName":"Table Full Copy","type":"PipelineReference"},"waitOnCompletion":true,"parameters":{"SourceSchema":{"value":"@item().SourceSchema","type":"Expression"},"SourceTable":{"value":"@item().SourceTable","type":"Expression"},"TargetSchema":{"value":"@item().TargetSchema","type":"Expression"},"TargetTable":{"value":"@item().TargetTable","type":"Expression"},"TableId":{"value":"@item().TableId","type":"Expression"},"KeyColumns":{"value":"@item().KeyColumns","type":"Expression"}}}}],"ifTrueActivities":[{"name":"Execute Incremental Copy Pipeline","type":"ExecutePipeline","dependsOn":[],"userProperties":[],"typeProperties":{"pipeline":{"referenceName":"Table Incremental Copy","type":"PipelineReference"},"waitOnCompletion":true,"parameters":{"SourceSchema":{"value":"@item().SourceSchema","type":"Expression"},"SourceTable":{"value":"@item().SourceTable","type":"Expression"},"TargetSchema":{"value":"@item().TargetSchema","type":"Expression"},"TargetTable":{"value":"@item().TargetTable","type":"Expression"},"LowWatermark":{"value":"@item().LowWatermark","type":"Expression"},"HighWatermark":{"value":"@item().HighWatermark","type":"Expression"},"KeyColumn":{"value":"@item().KeyColumns","type":"Expression"},"TableId":{"value":"@item().TableId","type":"Expression"},"GeneratePrimaryKey":{"value":"@and(pipeline().parameters.AutoGeneratePrimaryKeys, item().NeedsPrimaryKey)","type":"Expression"}}}}]}},{"name":"Execute Schema Drift Handler","type":"ExecutePipeline","dependsOn":[],"userProperties":[],"typeProperties":{"pipeline":{"referenceName":"Handle Schema Drift","type":"PipelineReference"},"waitOnCompletion":true,"parameters":{"SourceSchema":{"value":"@item().SourceSchema","type":"Expression"},"SourceTable":{"value":"@item().SourceTable","type":"Expression"},"TargetSchema":{"value":"@item().TargetSchema","type":"Expression"},"TargetTable":{"value":"@item().TargetTable","type":"Expression"}}}},{"name":"Add a primary key to the target table","description":"If requested and necessary, add a primary key to the target table.","type":"IfCondition","dependsOn":[{"activity":"Separate Full Copy from Incremental Copy Tables","dependencyConditions":["Succeeded"]}],"userProperties":[],"typeProperties":{"expression":{"value":"@and(pipeline().parameters.AutoGeneratePrimaryKeys, item().NeedsPrimaryKey)","type":"Expression"},"ifTrueActivities":[{"name":"Generate Primary Key","type":"SqlServerStoredProcedure","dependsOn":[],"policy":{"timeout":"7.00:00:00","retry":0,"retryIntervalInSeconds":30,"secureOutput":false,"secureInput":false},"userProperties":[],"typeProperties":{"storedProcedureName":"[[orchestration].[GeneratePrimaryKey]","storedProcedureParameters":{"KeyColumns":{"value":{"value":"@item().KeyColumns","type":"Expression"},"type":"String"},"Schema":{"value":{"value":"@item().TargetSchema","type":"Expression"},"type":"String"},"Table":{"value":{"value":"@item().TargetTable","type":"Expression"},"type":"String"}}},"linkedServiceName":{"referenceName":"[parameters('DestinationSQLDB')]","type":"LinkedServiceReference"}}]}}]}},{"name":"Auto-Populate List of Tables to Sync","description":"Check pipeline parameter setting to determine whether to auto-populate list of tables to sync.","type":"IfCondition","dependsOn":[{"activity":"Add orchestration objects to target DB","dependencyConditions":["Succeeded"]}],"userProperties":[],"typeProperties":{"expression":{"value":"@pipeline().parameters.AutoPopulateListOfTablesToSync","type":"Expression"},"ifTrueActivities":[{"name":"Populate List of Tables to Sync","type":"ExecutePipeline","dependsOn":[],"userProperties":[],"typeProperties":{"pipeline":{"referenceName":"Populate List of Tables to Sync","type":"PipelineReference"},"waitOnCompletion":true,"parameters":{"UsePartitionedTables":{"value":"@pipeline().parameters.UsePartitionedTables","type":"Expression"},"TargetSchema":{"value":"@pipeline().parameters.DefaultTargetSchema","type":"Expression"}}}}]}},{"name":"Add orchestration objects to target DB","description":"Add helper objects to the target database","type":"Script","dependsOn":[],"policy":{"timeout":"7.00:00:00","retry":0,"retryIntervalInSeconds":30,"secureOutput":false,"secureInput":false},"userProperties":[],"linkedServiceName":{"referenceName":"[parameters('DestinationSQLDB')]","type":"LinkedServiceReference"},"typeProperties":{"scripts":[{"type":"Query","text":"/****** Object: Schema [orchestration] ******/\nIF NOT EXISTS (SELECT name FROM sys.schemas WHERE name = N'orchestration')\nEXEC sp_executesql @stmt = N'CREATE SCHEMA [orchestration]'\n\n/****** Object: Schema [staging] ******/\nIF NOT EXISTS (SELECT name FROM sys.schemas WHERE name = N'staging')\nEXEC sp_executesql @stmt = N'CREATE SCHEMA [staging]'\n\n/****** Object: Table [orchestration].[ProcessingControl] ******/\nIF OBJECT_ID('[orchestration].[ProcessingControl]', 'U') IS NULL\nEXEC sp_executesql @stmt = N'\nCREATE TABLE [orchestration].[ProcessingControl](\n\t[TableId] [int] IDENTITY(1,1) NOT NULL,\n\t[SourceSchema] [nvarchar](128) NOT NULL,\n\t[SourceTable] [nvarchar](128) NOT NULL,\n\t[TargetSchema] [nvarchar](128) NOT NULL,\n\t[TargetTable] [nvarchar](128) NOT NULL,\n\t[KeyColumns] [nvarchar](4000) NULL,\n\t[IsIncremental] [bit] NOT NULL,\n\t[TableGroup] [smallint] NOT NULL DEFAULT (1),\n\t[IsEnabled] [bit] NOT NULL DEFAULT (1)\n CONSTRAINT [PK_ProcessingControl] PRIMARY KEY CLUSTERED \n(\n\t[TableId] ASC\n)WITH (STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, OPTIMIZE_FOR_SEQUENTIAL_KEY = OFF) ON [PRIMARY]\n) ON [PRIMARY]\n'\n/****** Object: Table [orchestration].[ProcessingLog] ******/\nIF OBJECT_ID('[orchestration].[ProcessingLog]', 'U') IS NULL\nEXEC sp_executesql @stmt = N'\nCREATE TABLE [orchestration].[ProcessingLog](\n\t[TableId] [int] NOT NULL,\n\t[PipelineRunId] [varchar](50) NOT NULL,\n\t[ProcessingStarted] [datetime2](7) NOT NULL,\n\t[ProcessingEnded] [datetime2](7) NULL,\n\t[LowWatermark] [datetime2](7) NULL,\n\t[HighWatermark] [datetime2](7) NULL,\n\t[RowsCopied] int NULL,\n\t[IsSuccessful] bit NULL\n CONSTRAINT [PK_ProcessingLog_1] PRIMARY KEY CLUSTERED \n(\n\t[TableId] ASC,\n\t[PipelineRunId] ASC\n)WITH (STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, OPTIMIZE_FOR_SEQUENTIAL_KEY = OFF) ON [PRIMARY]\n) ON [PRIMARY]\n\nCREATE UNIQUE NONCLUSTERED INDEX [IX_ProcessingControl_TableNames] ON [orchestration].[ProcessingControl]\n(\n\t[SourceSchema] ASC,\n\t[SourceTable] ASC\n)WITH (STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, DROP_EXISTING = OFF, ONLINE = OFF, OPTIMIZE_FOR_SEQUENTIAL_KEY = OFF) ON [PRIMARY]\n'\n\n/****** Object: StoredProcedure [orchestration].[GetTablesToProcess] ******/\nIF OBJECT_ID('[orchestration].[GetTablesToProcess]', 'P') IS NULL\nEXEC sp_executesql @stmt = N'\nCREATE PROCEDURE [orchestration].[GetTablesToProcess]\n\t@TableGroup smallint\n\nAS\n\nSELECT PC.TableId,\n\tPC.SourceSchema,\n\tPC.SourceTable,\n\tPC.TargetSchema,\n\tPC.TargetTable,\n\tPC.KeyColumns,\n\tISNULL(PC.IsIncremental, 0) AS IsIncremental,\n\tISNULL(MAX(PL.HighWatermark), ''1900-01-01'') AS LowWatermark,\n\tGETUTCDATE() AS HighWatermark,\n\tCAST(CASE WHEN PC.IsIncremental = 1 AND PK.CONSTRAINT_NAME IS NULL THEN 1 ELSE 0 END AS BIT) AS NeedsPrimaryKey\nFROM orchestration.ProcessingControl PC\n\tLEFT OUTER JOIN orchestration.ProcessingLog PL\n\t\tON PC.TableId= PL.TableId\n\t\tAND PL.ProcessingEnded IS NOT NULL\n\t\tAND PL.IsSuccessful = 1\n\tLEFT OUTER JOIN INFORMATION_SCHEMA.TABLE_CONSTRAINTS PK\n\t\tON PC.TargetSchema = PK.TABLE_SCHEMA\n\t\tAND PC.TargetTable = PK.TABLE_NAME\n\t\tAND PK.CONSTRAINT_TYPE = ''Primary Key''\nWHERE PC.TableGroup = @TableGroup\n\tAND PC.IsEnabled = 1\nGROUP BY PC.TableId,\n\tPC.SourceSchema,\n\tPC.SourceTable,\n\tPC.TargetSchema,\n\tPC.TargetTable,\n\tPC.KeyColumns,\n\tPC.IsIncremental,\n\tPK.CONSTRAINT_NAME\n'\n\n/****** Object: StoredProcedure [orchestration].[GeneratePrimaryKey] ******/\nIF OBJECT_ID('[orchestration].[GeneratePrimaryKey]', 'P') IS NULL\nEXEC sp_executesql @stmt = N'\nCREATE OR ALTER PROCEDURE [orchestration].[GeneratePrimaryKey]\n\t@Schema nvarchar(128), \n\t@Table nvarchar(128), \n\t@KeyColumns nvarchar(MAX)\n\nAS\n\n\n--Make sure that a Primary Key constraint does not exist on the specified table\nIF NOT EXISTS (\n\tSELECT * \n\tFROM INFORMATION_SCHEMA.TABLE_CONSTRAINTS\n\tWHERE CONSTRAINT_TYPE = ''Primary Key''\n\tAND TABLE_SCHEMA = @Schema\n\tAND TABLE_NAME = @Table\n)\n\nBEGIN\n\t--Parse and reformat the list of key columns in a manner that will be suitable for subsequent processing\n\tDECLARE @KeyColumnsFormatted nvarchar(MAX) \n\tSELECT @KeyColumnsFormatted = STRING_AGG(QUOTENAME([value]), '','') FROM STRING_SPLIT(@KeyColumns, '','')\n\n\tDECLARE @SQL nvarchar(MAX) = ''''\n\n\t--Make key columns non-nullable\n\tSELECT @SQL = @SQL + ''ALTER TABLE [''+ @Schema + ''].['' + @Table + ''] ALTER COLUMN ['' + name + ''] '' + system_type_name + '' NOT NULL; ''\n\tFROM sys.dm_exec_describe_first_result_set(''SELECT '' + @KeyColumnsFormatted + '' FROM [''+ @Schema + ''].['' + @Table + '']'' , NULL, NULL)\n\tEXEC sp_executesql @stmt = @SQL\n\t\n\t--Add primary key\n\tSET @SQL = ''ALTER TABLE [''+ @Schema + ''].['' + @Table + ''] ADD CONSTRAINT [PK_'' + @Schema + ''_'' + @Table + ''] PRIMARY KEY CLUSTERED ('' + @KeyColumnsFormatted + '')''\n\tEXEC sp_executesql @stmt = @SQL\nEND\n'\n\n/****** Object: StoredProcedure [orchestration].[SchemaDriftHandler] ******/\nIF OBJECT_ID('[orchestration].[SchemaDriftHandler]', 'P') IS NULL\nEXEC sp_executesql @stmt = N'\nCREATE OR ALTER PROCEDURE [orchestration].[SchemaDriftHandler] \n\t@Schema nvarchar(128), \n\t@Table nvarchar(128), \n\t@ColumnDefinitions nvarchar(MAX)\n\nAS\n\nDECLARE @AddSQL nvarchar(MAX), @AlterSQL nvarchar(MAX)\n\n--Parse the column definition parameter passed from the source, identify columns that do not exist in the target, and generate a SQL statement for adding the columns\nIF OBJECT_ID(''[''+ @Schema + ''].['' + @Table + '']'', ''U'') IS NOT NULL\nBEGIN\n\tSELECT @AddSQL = ''ALTER TABLE [''+ @Schema + ''].['' + @Table + ''] ADD '' + STRING_AGG(CAST(QUOTENAME([Source].ColumnName) + '' '' + [Source].Type AS nvarchar(MAX)), '', '')\n\tFROM OPENJSON(@ColumnDefinitions)\n\t\tWITH (\n\t\t\tColumnName varchar(128) ''$.ColumnName'',\n\t\t\tType varchar(128) ''$.Type''\n\t\t) AS [Source]\n\t\tLEFT JOIN INFORMATION_SCHEMA.COLUMNS AS [Target]\n\t\t\tON [Target].TABLE_SCHEMA = @Schema\n\t\t\tAND [Target].TABLE_NAME = @Table\n\t\t\tAND [Source].ColumnName = [Target].COLUMN_NAME\n\tWHERE [Target].COLUMN_NAME IS NULL\nEND\n\n--If the SQL statement is not null and the target table exists, execute the SQL statement\nIF @AddSQL IS NOT NULL AND OBJECT_ID(''[''+ @Schema + ''].['' + @Table + '']'', ''U'') IS NOT NULL\nBEGIN\n\tEXEC sp_executesql @stmt = @AddSQL\nEND\n\n--Update the data type of the SyncedToSqlDbOn column added to the destination tables\nSELECT @AlterSQL = ''ALTER TABLE [''+ @Schema + ''].['' + @Table + ''] ALTER COLUMN SyncedToSqlDbOn DateTime2''\n\tFROM INFORMATION_SCHEMA.COLUMNS AS [Target]\n\tWHERE [Target].TABLE_SCHEMA = @Schema\n\t\tAND [Target].TABLE_NAME = @Table\n\t\tAND [Target].COLUMN_NAME = ''SyncedToSqlDbOn''\n\t\tAND [Target].DATA_TYPE <> ''datetime2''\n\n--If the SQL statement is not null\nIF @AlterSQL IS NOT NULL\nBEGIN\n\tEXEC sp_executesql @stmt = @AlterSQL\nEND\n'"}]}}],"policy":{"elapsedTimeMetric":{},"cancelAfter":{}},"parameters":{"AutoPopulateListOfTablesToSync":{"type":"bool","defaultValue":true},"AutoGeneratePrimaryKeys":{"type":"bool","defaultValue":true},"TableGroupToSync":{"type":"int","defaultValue":1},"UsePartitionedTables":{"type":"bool","defaultValue":false},"DefaultTargetSchema":{"type":"string","defaultValue":"dbo"}},"variables":{"Structure":{"type":"Array"}},"folder":{"name":"Dataverse - Synapse Serverless to SQLDB"},"annotations":[],"lastPublishTime":"2022-09-16T13:55:00Z"},"dependsOn":["[concat(variables('workspaceId'), '/datasets/DataverseSQLDB')]","[concat(variables('workspaceId'), '/pipelines/Handle Schema Drift')]","[concat(variables('workspaceId'), '/pipelines/Populate List of Tables to Sync')]","[concat(variables('workspaceId'), '/pipelines/Table Full Copy')]","[concat(variables('workspaceId'), '/pipelines/Table Incremental Copy')]"]},{"name":"[concat(parameters('workspaceName'), '/DataverseSQLDB')]","type":"Microsoft.Synapse/workspaces/datasets","apiVersion":"2019-06-01-preview","properties":{"linkedServiceName":{"referenceName":"[parameters('DestinationSQLDB')]","type":"LinkedServiceReference"},"parameters":{"SchemaName":{"type":"string"},"TableName":{"type":"string"}},"annotations":[],"type":"AzureSqlTable","schema":[],"typeProperties":{"schema":{"value":"@dataset().SchemaName","type":"Expression"},"table":{"value":"@dataset().TableName","type":"Expression"}}},"dependsOn":[]},{"name":"[concat(parameters('workspaceName'), '/Handle Schema Drift')]","type":"Microsoft.Synapse/workspaces/pipelines","apiVersion":"2019-06-01-preview","properties":{"description":"Detect schema drift and, if necessary, add new columns to the target table.","activities":[{"name":"Execute stored procedure","description":"Execute the orchestration.SchemaDriftAddColumns procedure","type":"Script","dependsOn":[{"activity":"For each column","dependencyConditions":["Succeeded"]}],"policy":{"timeout":"7.00:00:00","retry":0,"retryIntervalInSeconds":30,"secureOutput":false,"secureInput":false},"userProperties":[],"linkedServiceName":{"referenceName":"[parameters('DestinationSQLDB')]","type":"LinkedServiceReference"},"typeProperties":{"scripts":[{"type":"Query","text":{"value":"@concat('EXEC [orchestration].[SchemaDriftHandler] \n\t@Schema = ''', pipeline().parameters.TargetSchema,''',\n\t@Table = ''',pipeline().parameters.TargetTable,''',\n\t@ColumnDefinitions =''', string(variables('ColumnDefinitions')), '''')","type":"Expression"}}]}},{"name":"Get source table definition","type":"Lookup","dependsOn":[],"policy":{"timeout":"7.00:00:00","retry":0,"retryIntervalInSeconds":30,"secureOutput":false,"secureInput":false},"userProperties":[],"typeProperties":{"source":{"type":"AzureSqlSource","sqlReaderQuery":{"value":"@concat('EXEC sp_describe_first_result_set @tsql = N''SELECT TOP 1 * FROM [',pipeline().parameters.SourceSchema,'].[',pipeline().parameters.SourceTable,']''')","type":"Expression"},"queryTimeout":"02:00:00","partitionOption":"None"},"dataset":{"referenceName":"DataverseSynapseServerlessDB","type":"DatasetReference","parameters":{"TableName":"n/a","SchemaName":"n/a"}},"firstRowOnly":false}},{"name":"For each column","description":"Iterate over each column in the source table","type":"ForEach","dependsOn":[{"activity":"Get source table definition","dependencyConditions":["Succeeded"]}],"userProperties":[],"typeProperties":{"items":{"value":"@activity('Get source table definition').output.value","type":"Expression"},"batchCount":50,"activities":[{"name":"Append column definition","description":"Append column name and data type to an array variable","type":"AppendVariable","dependsOn":[],"userProperties":[],"typeProperties":{"variableName":"ColumnDefinitions","value":{"value":"@json(concat('{\"ColumnName\":\"', item().name, '\", \"Type\":\"', item().system_type_name, '\"}'))","type":"Expression"}}}]}}],"policy":{"elapsedTimeMetric":{},"cancelAfter":{}},"parameters":{"SourceSchema":{"type":"string"},"SourceTable":{"type":"string"},"TargetSchema":{"type":"string"},"TargetTable":{"type":"string"}},"variables":{"Structure":{"type":"Array"},"ColumnDefinitions":{"type":"Array"}},"folder":{"name":"Dataverse - Synapse Serverless to SQLDB"},"annotations":[],"lastPublishTime":"2022-09-16T13:54:37Z"},"dependsOn":["[concat(variables('workspaceId'), '/datasets/DataverseSynapseServerlessDB')]"]},{"name":"[concat(parameters('workspaceName'), '/Populate List of Tables to Sync')]","type":"Microsoft.Synapse/workspaces/pipelines","apiVersion":"2019-06-01-preview","properties":{"description":"Identify tables in the source database that appear to be suitable for incremental or full copy behavior and load them into the orchestration.ProcessingControl table.\n\nNote: review the list of tables in the orchestration.ProcessingControl table to ensure that the list matches your requirements.","activities":[{"name":"Copy list of tables to process","description":"Identify tables in the source database that have the suffixes of \"_partitioned\" and certain metadata tables, determine whether they appear to be suitable for incremental or full copy behavior and upsert them into the orchestration.ProcessingControl table.","type":"Copy","dependsOn":[],"policy":{"timeout":"7.00:00:00","retry":0,"retryIntervalInSeconds":30,"secureOutput":false,"secureInput":false},"userProperties":[],"typeProperties":{"source":{"type":"AzureSqlSource","sqlReaderQuery":{"value":"@replace(replace('--Specify whether the solution should use the \"_partitioned\" tables as its source\nDECLARE @UsePartitionedTables BIT = UsePartitionedTablesParameter\n\n--Get a list of tables with the suffixes of \"_partitioned\" and certain metadata tables from the dbo schema\n; WITH DiscoveredTables AS (\nSELECT T.TABLE_SCHEMA AS SourceSchema,\n\tT.TABLE_NAME AS SourceTable,\n\tIIF(T.TABLE_NAME LIKE ''%_partitioned'', LEFT(T.TABLE_NAME, LEN(T.TABLE_NAME)-12), T.TABLE_NAME) AS TargetTable,\n\t--If the table contains columns Id, IsDeleted, SinkModifiedOn and versionnumber treat it as an incrementally-updated data table\n\tIIF(SUM(IIF(C.COLUMN_NAME IN (''Id'', ''IsDelete'', ''SinkModifiedOn'', ''versionnumber''), 1, 0)) = 4, 1, 0) AS IsIncrementalDataTable\nFROM INFORMATION_SCHEMA.TABLES T\n\tJOIN INFORMATION_SCHEMA.COLUMNS C\n\t\tON T.TABLE_SCHEMA= C.TABLE_SCHEMA\n\t\tAND T.TABLE_NAME = C.TABLE_NAME\nWHERE T.TABLE_SCHEMA = ''dbo''\n\tAND (\n\t\t(@UsePartitionedTables = 0 AND T.TABLE_NAME NOT LIKE ''%_partitioned'')\n\t\tOR (@UsePartitionedTables = 1 AND T.TABLE_NAME LIKE ''%_partitioned'')\n\t\tOR T.TABLE_NAME IN (''StateMetadata'', ''StatusMetadata'', ''TargetMetadata'', ''OptionsetMetadata'', ''GlobalOptionsetMetadata'')\n\t\t)\nGROUP BY T.TABLE_SCHEMA,\n\tT.TABLE_NAME\n)\n\nSELECT T.SourceSchema,\n\tT.SourceTable,\n\t''TargetSchemaParameter'' AS TargetSchema,\n\tT.TargetTable,\n\t--If the table contains columns Id and IsDeleted, treat it as an incrementally-updated table with a KeyColumnName of Id\n\t--Otherwise, assume that it requires a full refresh\n\tCASE WHEN \n\t\tT.IsIncrementalDataTable = 1 THEN ''Id'' \n\t\tWHEN T.SourceTable = ''GlobalOptionsetMetadata'' THEN ''GlobalOptionSetName,Option,LocalizedLabelLanguageCode'' \n\t\tWHEN T.SourceTable = ''OptionsetMetadata'' THEN ''EntityName,OptionSetName,Option,LocalizedLabelLanguageCode'' \n\t\tWHEN T.SourceTable = ''StateMetadata'' THEN ''EntityName,State,LocalizedLabelLanguageCode'' \n\t\tWHEN T.SourceTable = ''StatusMetadata'' THEN ''EntityName,Status,LocalizedLabelLanguageCode'' \n\t\tWHEN T.SourceTable = ''TargetMetadata'' THEN ''EntityName,AttributeName,ReferencedEntity,ReferencedAttribute'' \n\t\tEND AS KeyColumns,\n\tCAST(IIF(T.IsIncrementalDataTable = 1, 1, 0) AS BIT) AS IsIncremental\nFROM DiscoveredTables T\nWHERE T.IsIncrementalDataTable = 1 OR T.SourceTable IN (''StateMetadata'', ''StatusMetadata'', ''TargetMetadata'', ''OptionsetMetadata'', ''GlobalOptionsetMetadata'')\n', 'UsePartitionedTablesParameter', if(pipeline().parameters.UsePartitionedTables, '1', '0')), 'TargetSchemaParameter', pipeline().parameters.TargetSchema)","type":"Expression"},"queryTimeout":"02:00:00","partitionOption":"None"},"sink":{"type":"AzureSqlSink","writeBehavior":"upsert","upsertSettings":{"useTempDB":false,"keys":["SourceSchema","SourceTable"],"interimSchemaName":"staging"},"sqlWriterUseTableLock":false,"disableMetricsCollection":false},"enableStaging":false,"translator":{"type":"TabularTranslator","typeConversion":true,"typeConversionSettings":{"allowDataTruncation":true,"treatBooleanAsNumber":false}}},"inputs":[{"referenceName":"DataverseSynapseServerlessDB","type":"DatasetReference","parameters":{"TableName":"n/a","SchemaName":"n/a"}}],"outputs":[{"referenceName":"DataverseSQLDB","type":"DatasetReference","parameters":{"SchemaName":"orchestration","TableName":"ProcessingControl"}}]}],"policy":{"elapsedTimeMetric":{},"cancelAfter":{}},"parameters":{"UsePartitionedTables":{"type":"bool","defaultValue":true},"TargetSchema":{"type":"string","defaultValue":"dbo"}},"folder":{"name":"Dataverse - Synapse Serverless to SQLDB"},"annotations":[],"lastPublishTime":"2022-09-16T13:54:44Z"},"dependsOn":["[concat(variables('workspaceId'), '/datasets/DataverseSynapseServerlessDB')]","[concat(variables('workspaceId'), '/datasets/DataverseSQLDB')]"]},{"name":"[concat(parameters('workspaceName'), '/Table Full Copy')]","type":"Microsoft.Synapse/workspaces/pipelines","apiVersion":"2019-06-01-preview","properties":{"description":"Perform a full copy of a table (truncate and reload the destination table)","activities":[{"name":"Copy and Upsert Data","description":"Retrieve all data from the source table and upsert it into the destination table. Create a new table (if it does not exist). Log the start of a sync process.\n\nNote: the read behavior involves reading all data from source. The write behavior is an upsert.","type":"Copy","dependsOn":[],"policy":{"timeout":"7.00:00:00","retry":3,"retryIntervalInSeconds":30,"secureOutput":false,"secureInput":false},"userProperties":[],"typeProperties":{"source":{"type":"AzureSqlSource","additionalColumns":[{"name":"SyncedToSqlDbOn","value":{"value":"@pipeline().TriggerTime","type":"Expression"}}],"queryTimeout":"02:00:00","partitionOption":"None"},"sink":{"type":"AzureSqlSink","preCopyScript":{"value":"@{concat('INSERT INTO [orchestration].[ProcessingLog]\n ([TableId],\n [PipelineRunId],\n [ProcessingStarted])\n VALUES\n (''', string(pipeline().parameters.TableId), ''',\n ''', pipeline().RunId, ''',\n ''', string(pipeline().TriggerTime), ''')\n ')}","type":"Expression"},"writeBehavior":"upsert","upsertSettings":{"useTempDB":true,"keys":{"value":"@split(pipeline().parameters.KeyColumns, ',')","type":"Expression"}},"sqlWriterUseTableLock":false,"tableOption":"autoCreate","disableMetricsCollection":false},"enableStaging":false,"translator":{"type":"TabularTranslator","typeConversion":true,"typeConversionSettings":{"allowDataTruncation":true,"treatBooleanAsNumber":false}}},"inputs":[{"referenceName":"DataverseSynapseServerlessDB","type":"DatasetReference","parameters":{"TableName":{"value":"@pipeline().parameters.SourceTable","type":"Expression"},"SchemaName":{"value":"@pipeline().parameters.SourceSchema","type":"Expression"}}}],"outputs":[{"referenceName":"DataverseSQLDB","type":"DatasetReference","parameters":{"SchemaName":{"value":"@pipeline().parameters.TargetSchema","type":"Expression"},"TableName":{"value":"@pipeline().parameters.TargetTable","type":"Expression"}}}]},{"name":"Log successful processing end","description":"Update a log record to indicate successful completion of the sync process.","type":"Script","dependsOn":[{"activity":"Copy and Upsert Data","dependencyConditions":["Completed"]}],"policy":{"timeout":"7.00:00:00","retry":0,"retryIntervalInSeconds":30,"secureOutput":false,"secureInput":false},"userProperties":[],"linkedServiceName":{"referenceName":"[parameters('DestinationSQLDB')]","type":"LinkedServiceReference"},"typeProperties":{"scripts":[{"type":"Query","text":{"value":"@concat('\nUPDATE orchestration.ProcessingLog\nSET ProcessingEnded = GETUTCDATE(),\n IsSuccessful = ', string(if(equals(activity('Copy and Upsert Data').output.executionDetails[0].status, 'Succeeded'), 1, 0)), ',\n RowsCopied = ', string(activity('Copy and Upsert Data').output.rowsCopied), '\nWHERE TableId = ', string(pipeline().parameters.TableId),' \n AND PipelineRunId = ''', pipeline().RunId, '''')","type":"Expression"}}]}},{"name":"Fail pipeline","description":"Fail pipeline execution in the event of copy activity failure.","type":"Fail","dependsOn":[{"activity":"Log successful processing end","dependencyConditions":["Completed"]},{"activity":"Copy and Upsert Data","dependencyConditions":["Failed"]}],"userProperties":[],"typeProperties":{"message":"1000","errorCode":"Failed pipeline execution due to copy activity failure."}}],"policy":{"elapsedTimeMetric":{},"cancelAfter":{}},"parameters":{"SourceSchema":{"type":"string","defaultValue":"dbo"},"SourceTable":{"type":"string"},"TargetSchema":{"type":"string","defaultValue":"dbo"},"TargetTable":{"type":"string"},"TableId":{"type":"int"},"KeyColumns":{"type":"string"}},"folder":{"name":"Dataverse - Synapse Serverless to SQLDB"},"annotations":[],"lastPublishTime":"2022-09-16T13:54:51Z"},"dependsOn":["[concat(variables('workspaceId'), '/datasets/DataverseSynapseServerlessDB')]","[concat(variables('workspaceId'), '/datasets/DataverseSQLDB')]"]},{"name":"[concat(parameters('workspaceName'), '/Table Incremental Copy')]","type":"Microsoft.Synapse/workspaces/pipelines","apiVersion":"2019-06-01-preview","properties":{"description":"Perform an incremental copy of a table (upsert)","activities":[{"name":"Copy and Upsert Data","description":"Retrieve and de-duplicate data from the source table and upsert it into the destination table. Create the destination table if it does not exist. Log the start of a sync process.","type":"Copy","dependsOn":[],"policy":{"timeout":"7.00:00:00","retry":3,"retryIntervalInSeconds":30,"secureOutput":false,"secureInput":false},"userProperties":[],"typeProperties":{"source":{"type":"AzureSqlSource","additionalColumns":[{"name":"SyncedToSqlDbOn","value":{"value":"@pipeline().TriggerTime","type":"Expression"}}],"sqlReaderQuery":{"value":"@replace(replace(replace(replace(replace('\n--Get a set of latest records from a table modified between specific watermark dates\n\n; WITH CurrentRecord AS (\nSELECT [KeyColumn], \n\tMAX(versionnumber) AS versionnumber,\n\tMAX(SinkModifiedOn) AS SinkModifiedOn\nFROM [SourceSchema].[SourceTable]\nWHERE SinkModifiedOn >= ''1900-01-01'' \n\tAND SinkModifiedOn < ''2999-12-31''\nGROUP BY [KeyColumn]\n)\n\nSELECT BaseTable.*\nFROM [SourceSchema].[SourceTable] AS BaseTable\n\tJOIN CurrentRecord\n\t\tON BaseTable.[KeyColumn] = CurrentRecord.[KeyColumn]\n\t\tAND BaseTable.versionnumber = CurrentRecord.versionnumber\n\t\tAND BaseTable.SinkModifiedOn = CurrentRecord.SinkModifiedOn\n', \n'SourceSchema', pipeline().parameters.SourceSchema),\n'SourceTable', pipeline().parameters.SourceTable),\n\n'1900-01-01', pipeline().parameters.LowWatermark),\n'2099-12-31', pipeline().parameters.HighWatermark),\n'KeyColumn', pipeline().parameters.KeyColumn)","type":"Expression"},"queryTimeout":"02:00:00","partitionOption":"None"},"sink":{"type":"AzureSqlSink","preCopyScript":{"value":"@{concat('INSERT INTO [orchestration].[ProcessingLog]\n ([TableId],\n [PipelineRunId],\n [ProcessingStarted],\n [LowWatermark],\n [HighWatermark])\n VALUES\n (''', string(pipeline().parameters.TableId), ''',\n ''', pipeline().RunId, ''',\n ''', string(pipeline().TriggerTime), ''',\n ''', pipeline().parameters.LowWatermark, ''',\n ''', pipeline().parameters.HighWatermark, ''')')}","type":"Expression"},"writeBehavior":"upsert","upsertSettings":{"useTempDB":false,"keys":{"value":"@split(pipeline().parameters.KeyColumn, ',')","type":"Expression"},"interimSchemaName":"staging"},"sqlWriterUseTableLock":false,"tableOption":"autoCreate","disableMetricsCollection":false},"enableStaging":false,"translator":{"type":"TabularTranslator","typeConversion":true,"typeConversionSettings":{"allowDataTruncation":true,"treatBooleanAsNumber":false}}},"inputs":[{"referenceName":"DataverseSynapseServerlessDB","type":"DatasetReference","parameters":{"TableName":"n/a","SchemaName":"n/a"}}],"outputs":[{"referenceName":"DataverseSQLDB","type":"DatasetReference","parameters":{"SchemaName":{"value":"@pipeline().parameters.TargetSchema","type":"Expression"},"TableName":{"value":"@pipeline().parameters.TargetTable","type":"Expression"}}}]},{"name":"Log processing end","description":"Update a log record to indicate successful completion of the sync process.","type":"Script","dependsOn":[{"activity":"Copy and Upsert Data","dependencyConditions":["Completed"]}],"policy":{"timeout":"7.00:00:00","retry":0,"retryIntervalInSeconds":30,"secureOutput":false,"secureInput":false},"userProperties":[],"linkedServiceName":{"referenceName":"[parameters('DestinationSQLDB')]","type":"LinkedServiceReference"},"typeProperties":{"scripts":[{"type":"Query","text":{"value":"@concat('\nUPDATE orchestration.ProcessingLog\nSET ProcessingEnded = GETUTCDATE(),\n IsSuccessful = ', string(if(equals(activity('Copy and Upsert Data').output.executionDetails[0].status, 'Succeeded'), 1, 0)), ',\n RowsCopied = ', string(activity('Copy and Upsert Data').output.rowsCopied), '\nWHERE TableId = ', string(pipeline().parameters.TableId),' \n AND PipelineRunId = ''', pipeline().RunId, '''')","type":"Expression"}}]}},{"name":"Fail pipeline","description":"Fail pipeline execution in the event of copy activity failure.","type":"Fail","dependsOn":[{"activity":"Copy and Upsert Data","dependencyConditions":["Failed"]},{"activity":"Log processing end","dependencyConditions":["Completed"]}],"userProperties":[],"typeProperties":{"message":"1000","errorCode":"Failed pipeline execution due to copy activity failure."}}],"policy":{"elapsedTimeMetric":{},"cancelAfter":{}},"parameters":{"SourceSchema":{"type":"string","defaultValue":"dbo"},"SourceTable":{"type":"string"},"TargetSchema":{"type":"string","defaultValue":"dbo"},"TargetTable":{"type":"string"},"LowWatermark":{"type":"string","defaultValue":"1900-01-01"},"HighWatermark":{"type":"string","defaultValue":"2999-12-31"},"KeyColumn":{"type":"string","defaultValue":"Id"},"TableId":{"type":"int"},"GeneratePrimaryKey":{"type":"bool"}},"folder":{"name":"Dataverse - Synapse Serverless to SQLDB"},"annotations":[],"lastPublishTime":"2022-09-16T13:54:55Z"},"dependsOn":["[concat(variables('workspaceId'), '/datasets/DataverseSynapseServerlessDB')]","[concat(variables('workspaceId'), '/datasets/DataverseSQLDB')]"]},{"name":"[concat(parameters('workspaceName'), '/DataverseSynapseServerlessDB')]","type":"Microsoft.Synapse/workspaces/datasets","apiVersion":"2019-06-01-preview","properties":{"linkedServiceName":{"referenceName":"[parameters('SourceSynapseServerlessDB')]","type":"LinkedServiceReference"},"parameters":{"TableName":{"type":"string"},"SchemaName":{"type":"string"}},"annotations":[],"type":"AzureSqlTable","schema":[],"typeProperties":{"schema":{"value":"@dataset().SchemaName","type":"Expression"},"table":{"value":"@dataset().TableName","type":"Expression"}}},"dependsOn":[]}]} \ No newline at end of file diff --git a/community templates/DataverseToAzureSQLDB/manifest.json b/community templates/DataverseToAzureSQLDB/manifest.json new file mode 100644 index 00000000..9e6cc59c --- /dev/null +++ b/community templates/DataverseToAzureSQLDB/manifest.json @@ -0,0 +1,48 @@ +{ + "name": "Sync Dataverse data from Synapse Serverless to Azure SQL DB", + "description": "Extend Azure Synapse Link for Dataverse to sync Dataverse data from a Synapse Serverless SQL database to Azure SQL DB. This template accelerates migration from the deprecated Data Export Service for Dataverse.", + "image": "LookupGet tables toprocessForEachFor each tableActivitiesExecuteSchema...SeparateFull Copy...Add aprimary...+If ConditionAuto-Populate Listof Tables to SyncTruePopulateList of...+False+ScriptAdd orchestrationobjects to target...", + "icons": [ + "Lookup", + "ForEach", + "IfCondition" + ], + "requires": { + "linkedservices": { + "DestinationSQLDB": { + "supportTypes": [ + "AzureSqlDatabase" + ] + }, + "SourceSynapseServerlessDB": { + "supportTypes": [ + "AzureSqlDatabase" + ] + } + } + }, + "contributorType": "Community", + "author": "Slava Trofimov", + "documentation": "https://github.com/Azure/Azure-DataFactory/blob/main/community%20templates/documentation/DataverseToAzureSQLDB/DataverseToAzureSQLDB.md", + "annotations": [ + "Synapse Link", + "Synapse Link for Dataverse", + "Copy", + "Dataverse", + "Azure SQL DB", + "Azure SQL Database", + "DES", + "Data Export Service", + "Dynamics" + ], + "services": [ + "Synapse Serverless SQL Pools", + "Azure SQL Database" + ], + "categories": [ + "Synapse Link", + "Copy", + "Synapse", + "Integration" + ] +} \ No newline at end of file diff --git a/community templates/documentation/DataverseToAzureSQLDB/DataverseToAzureSQLDB.md b/community templates/documentation/DataverseToAzureSQLDB/DataverseToAzureSQLDB.md new file mode 100644 index 00000000..81a1daf0 --- /dev/null +++ b/community templates/documentation/DataverseToAzureSQLDB/DataverseToAzureSQLDB.md @@ -0,0 +1,160 @@ +# Sync Dataverse data from Synapse Serverless to Azure SQL DB + Extend Azure Synapse Link for Dataverse by syncing Dataverse data to an Azure SQL DB. + +## Background and Problem Statement +[Microsoft Dataverse](https://docs.microsoft.com/en-us/power-apps/maker/data-platform/data-platform-intro) is a secure and versatile data storage and management layer for a variety of business applications, such as Microsoft Dynamics, Power Apps, Power Automate, etc. [Synapse Link for Dataverse](https://docs.microsoft.com/en-us/power-apps/maker/data-platform/export-to-data-lake) provides a seamless mechanism for continuously exporting Dataverse data to your Synapse Analytics Workspace for integration, analytics and business intelligence purposes. + +While data exported via Synapse Link for Dataverse can be accessed using Synapse Analytics Serverless SQL Pools, some customers may prefer to make Dataverse data available in an Azure SQL Database to support a broader range of data integration or data enrichment requirements. This scenario may be particularly relevant for customers who previously used the [Data Export Service (DES)](https://docs.microsoft.com/en-us/power-apps/developer/data-platform/data-export-service) to bring Dataverse data from their Microsoft Dynamics applications to their Azure SQL Database. + +The Data Export Service was deprecated in November 2021 and will reach its [end-of-support and end-of-life in November 2022](https://powerapps.microsoft.com/en-us/blog/do-more-with-data-from-data-export-service-to-azure-synapse-link-for-dataverse/). Synapse Link for Dataverse does not natively support an Azure SQL DB as the destination for data export. However, a flexible and scalable data integration solution can be implemented to copy Dataverse data from a Synapse Analytics Workspace an Azure SQL database as a replacement for the Data Export Service. + +## Solution Overview +This repository includes a solution accelerator for incrementally synchronizing Dataverse data from external tables in Synapse Serverless SQL Pool to an Azure SQL Database. The solution consists of Synapse Pipelines for data movement, as well as database objects for configuration and logging of the data integration process. This solution can be rapidly deployed to dynamically synchronize any number of tables between a Serverless SQL Pool and an Azure SQL DB. + +The architecture of this solution is depicted on the following diagram: + +![Solution Architecture](Images/SolutionArchitecture.png) + +This solution supports copying tables either incrementally or in full. Most tables will be loaded incrementally. However, certain metadata tables, such as StateMetadata, StatusMetadata, OptionsetMetadata, GlobalOptionsetMetadata and TargetMetadata should be loaded in full (since they tend to be small and do not have relevant columns to indicate when specific records have been modified). + +The solution keeps track of integration pipeline runs for each table and retrieves only those records that have been added, changed or soft-deleted since the latest successful pipeline run. + +In order to facilitate incremental copying of data, this solution requires table export in Synapse Link for Dataverse to be configured in *append-only mode* as described in the following [article](https://docs.microsoft.com/en-us/power-apps/maker/data-platform/azure-synapse-link-advanced-configuration#in-place-updates-vs-append-only-writes). Hence, data available in the Serverless SQL Pool may contain multiple versions of any given record, as that record evolves over time. However, this solution deduplicates the data in such a way that the destination Azure SQL Database contains only the latest version of any given record, which simplifies the consumption of the data. + +Deleted records in incrementally copied tables are handled as soft deletes by adding a flag to the IsDelete column. If desired, database views can be added in the target SQL database to filter out the deleted records. + +For scenarios requiring lower latencies (such as near-real-time access to data), the solution can be configured not to use the "_partitioned" tables based on hourly snapshots, but rather regular tables that are receiving data from Synapse Link on an ongoing basis. + +This solution automatically handles common types of schema evolution in the source system. Synapse Link for Dataverse will automatically accommodate newly added columns (as documented [here](https://docs.microsoft.com/en-us/power-apps/maker/data-platform/export-data-lake-faq)). This solution will detect newly-added columns during each pipeline execution and will add corresonding columns to the tables in the target database. Columns deleted from the source system will remain in the target database, but will no longer be updated. + +This solution will automatically create several helper objects in the target Azure SQL Database during the first synchronization. These objects will support configuration and logging of the data integration process. Following is the list of helper database objects: + +- Schemas: orchestration and staging +- Tables: orchestration.ProcessingControl and orchestration.ProcessingLog +- Stored Procedures: orchestration.GetTablesToProcess, orchestration.GeneratePrimaryKey and orchestration.SchemaDriftHandler. + +## Implementation +### Prerequisites +1. You have [provisioned a Synapse Analytics Workspace](https://docs.microsoft.com/en-us/azure/synapse-analytics/quickstart-create-workspace). +1. You have [configured Synapse Link for Dataverse to export relevant entities to a Synapse Analytics Workspace](https://docs.microsoft.com/en-us/power-apps/maker/data-platform/azure-synapse-link-synapse). This solution requires table export to be configured in *append-only mode* as described in the following [article](https://docs.microsoft.com/en-us/power-apps/maker/data-platform/azure-synapse-link-advanced-configuration#in-place-updates-vs-append-only-writes). +1. You have sufficient read access to the lake database created by Synapse Link for Dataverse and to the underlying Data Lake storage account. +1. You have admninistrative access to the Synapse Analytics Workspace +1. You have administrative access to the target Azure SQL DB. + +### Installation +1. Provision an Azure SQL Database to serve as the target of your Dataverse data. [See documentation](https://docs.microsoft.com/en-us/azure/azure-sql/database/single-database-create-quickstart?view=azuresql&tabs=azure-portal). Ensure that the "Allow Azure services and resources to access this server" setting is enabled. [See documentation](https://docs.microsoft.com/en-us/azure/azure-sql/database/firewall-configure?view=azuresql) + +2. Grant your Synapse Analytics Workspace access to your target Azure SQL Database by adding the managed identity of your Synapse Analytics Workspace to the db_owner role in the Azure SQL Database. [See additional documentation](https://docs.microsoft.com/en-us/azure/data-factory/connector-azure-sql-database?tabs=data-factory#managed-identity). You may use the following SQL statement: + +```SQL +CREATE USER [YourSynapseAnalyticsWorkspaceName] FROM EXTERNAL PROVIDER +GO +ALTER ROLE [db_owner] ADD MEMBER [YourSynapseAnalyticsWorkspaceName] +GO +``` +3. Ensure that the managed identity of your Synapse Analytics Workspace has access to the storage account container where your Dataverse data is stored by adding it to the *Storage Blob Data Reader* role. [See additional documentation](https://docs.microsoft.com/en-us/azure/synapse-analytics/security/how-to-grant-workspace-managed-identity-permissions). + +4. "Browse gallery" to find the pipeline template corresponding to this solution. Then, import the template from the gallery: + +![Import pipelines from template](Images/ImportPipelineFromTemplate.png) + +5. During the template import process, configure linked services for target Azure SQL DB and the source Serverless SQL Pool endpoint (as illustrated below). + +![Import pipelines from template](Images/ConfigureLinkedServices.png) + +6. Note: while configuring linked services for the source and target databases, please manually specify the fully-qualified domain name of your SQL endpoint and database name as illustrated below: + +![Import pipelines from template](Images/ConfigureLinkedServices-detail.png) + +7. Once the import process completes, you will find five pipelines in the *Dataverse - Synapse Serverless to SQLDB* folder, as illustrated below: + +![Imported pipelines](Images/ImportedPipelines.png) + +8. Clikc on the *Publish All* button in the header of your Synapse Analytics Workspace to publish the pipelines and related artifacts to your Synapse Workspace. + +### Configure and test your pipeline +**Sync Orchestration** is the main pipeline responsible for orchestrating the data synchronization process. To test the data synchronization process, use the *Trigger now* feature for the *Sync Orchestration* pipeline, as illustrated below. + +![Trigger orchestration pipeline now](Images/TriggerNow.png) + +While triggering the execution of the pipeline, you will be prompted to configure several parameters that allow you to customize its behavior to meet your needs. + +> - AutoPopulateListOfTablesToSync +> - AutoGeneratePrimaryKeys +> - TableGroupToSync +> - UsePartitionedTables +> - DefaultTargetSchema + +1. **AutoPopulateListOfTablesToSync** (default: true): by default, the solution is designed to perform automatic discovery of tables available in the source Serverless SQL database. Any table that appears to contains Dataverse data, as well as certain metadata tables (StateMetadata, StatusMetadata, OptionsetMetadata, GlobalOptionsetMetadata and TargetMetadata) will be automatically added to the *orchestration.ProcessingControl* table. If you prefer to disable auto-discovery (perhaps, after initially populating the list of desired tables), please set the *AutoPopulateListOfTablesToSync* parameter in the *Sync Orchestration* pipeline to *false*. Note, to maximize efficiency, it is recommended to disable automatic discovery of tables when configuring triggers that will be executed with high frequency (for near-real-time data access scenarios). + +1. **AutoGeneratePrimaryKeys** (default: true): by default, the solution will automatically add a clustered primary key index to each target table that does not have a primary key already. These primary keys will improve the efficiency of the incremental sync process. If you prefer to manage primary key creation manually, you may disable automatic primary key generation by setting the *AutoGeneratePrimaryKeys* parameter in the *Sync Orchestration* pipeline to *false*. + +1. **TableGroupToSync** (default: 1): by default, *all* source tables will be synced to the target database whenever the *Sync Orchestration* pipeline is executed. Yet, this may not be desirable if data latency requirements differ significantly across tables. When low latency of data is expected for some tables, these tables can be addded to a designated Table Group and their synchornization can be triggered on a frequent schedule. Other tables can be added to a different Table Group and their synchornization can be trigger on a less frequent schedule. + + 1. All newly discovered table will be added to Table Group 1, when they are added to the *orchestration.ProcessingControl* table. + 1. If desired, you may update records in the *orchestration.ProcessingControl* table to reassign specific tables to a separate Table Group (such as TableGroup 2, 3, etc.) + 1. When triggering the execution of the *Sync Orchestration* pipeline, specify which TableGroup should be synchornized by setting the value of the *TableGroupToSync* parameter. + +1. **UsePartitionedTables** (default: true): by default, this solution is configured to use external tables in the Serverless SQL Pool database that leverage [read-only hourly snapshots](https://docs.microsoft.com/en-us/power-apps/maker/data-platform/azure-synapse-link-synapse#access-near-real-time-data-and-read-only-snapshot-data-preview). + + 1. Using read-only snapshots helps to avoid read/write contention issues while reading data from files that are being written to by Synapse Link for Dataverse. Tables based on snapshot data in Serverless SQL Pool databases are labeled with the "_partitioned" suffix. Note, at the time of this writing, Synapse Link for Dataverse would not create snapshot data for empty tables. + + 1. For scenarios requiring lower latencies (such as near-real-time access to data), the solution can be configured not to use the "_partitioned" tables based on hourly snapshots, but rather regular tables that are receiving data from Synapse Link on an ongoing basis. You may do so, but setting the UsePartitionedTables parameter to *false*. To support this configuration, your Synapse Link for Dataverse must satisfy the following requirements: + + 1. Synapse Link for Dataverse must have been configured after August 2022. External Tables created by Synapse Link for Dataverse after August 2022 are configured with the [ALLOW_INCONSISTENT_READS](https://learn.microsoft.com/en-us/azure/synapse-analytics/sql/create-use-external-tables#external-table-on-appendable-files) table option, which allows for Serverless SQL Pools to read the data files while new records are being written to these files. + 1. Synapse Link for Dataverse must have been configured to export data in CSV format. At the time of this writing, Synapse Link for Dataverse implementations that export data in the Delta Lake format lack the soft-delete functionality that this synchronization solution requires. + +5. **DefaultTargetSchema** (default: dbo): by default, the solution will create destination tables in the *dbo* schema of the target database. If you prefer to create destination tables in a different schema, change the *DefaultTargetSchema* parameter to meet your requirements. + +> The *DefaultTargetSchema* you specify must exist in the target database. If necessary, create the desired schema. + +### Monitor synchronization process +After triggering the pipeline, you may monitor pipeline execution in the Synapse Analytics Workspace, as illustrated below. +![Trigger orchestration pipeline now](Images/MonitorPipelineRuns.png) + +In addition, you may monitor the outcomes of the synchronization process by querying the *orchestration.ProcessingLog* table in the target database. + +Finally, you may examine the content of tables in the target database and identify individual rows that have been added/updated at a specific time by looking at the *SyncedToSqlDbOn* field that is added to each table by this solution. + +### Configure recurring triggers +Create a scheduled trigger to execute the *Sync Orchestration* pipeline in an automated manner on a desired schedule. As discussed in the previous section, you may create multiple triggers, using different *TableGroupToSync* parameter values to optimize the synchronization of groups of tables with different data latency requirements. + +Note, when *UsePartitionedTables* parameter is set to true, pipeline execution should be scheduled no more frequently than once per hour (since underlying snapshots will be created on an hourly basis). +> **IMPORTANT!** Please ensure that the time interval between pipeline triggers is sufficiently long (compared to the duration of each pipeline execution) to prevent the possibility of multiple pipeline executions attempting to update the same group of tables at the same time. Simultaneous execution of the synchronization pipeline for the same group of tables may apply updates to certain records ouf of order, which could lead to data consistency problems! + +>When attempting to execute the synchronization pipeline frequently (for near-real-time data access), consider using the [Tumbling Window trigger](https://learn.microsoft.com/en-us/azure/data-factory/how-to-create-tumbling-window-trigger?tabs=synapse-analytics%2Cazure-powershell) (rather than Scheduled trigger) and set the [maxConcurrency property of the trigger to 1](https://learn.microsoft.com/en-us/azure/data-factory/how-to-create-tumbling-window-trigger?tabs=synapse-analytics%2Cazure-powershell#:~:text=No-,maxConcurrency,-The%20number%20of) (which will ensure that new trigger runs will be added to a queue but will not run in parallel). Note that you can have only one Tumbling Window trigger per pipeline--it is advisable to use it for the table group that requires most frequent synchronization, while using Scheduled triggers with other table groups. + + +## Next Steps and Additional Considerations +* By default, all discovered tables will included in the synchronization process. If you woudl like to exclude specific tables from synchronization, you may update records in the *orchestration.ProcessingControl* table and set the *IsEnabled* flag to 0 for any tabel that you do not wish to synchronize. + +* Consider creating non-clustered indexes to support specific query workloads that you anticipate. + +* Consider creating views on top of tables in the target SQL Database to hide deleted records to simplify data access from client applications. + +* Review performance and utilization of your target Azure SQL Database and adjust scale appropriately. + +* If necessary, adjust and refine Synapse Pipeline settings that control the performance of the data synchronization proces, such as: + * [Performance of Copy Activities](https://learn.microsoft.com/en-us/azure/data-factory/copy-activity-performance) + * Batch count of ["For Each" activity](https://learn.microsoft.com/en-us/azure/data-factory/control-flow-for-each-activity) + +* If the *AutoPopulateListOfTablesToSync* parameter in the *Sync Orchestration* pipeline is set to *true*, any new tables added to your Synapse Link for Dataverse will be automatically added to the ProcessingControl list and will be assigned to TableGroup 1. If you wish to disable the *AutoPopulateListOfTablesToSync* parameter, you may manually add new tables to be synchronized by this solution at any time. Simply add the configuration details for the desired tables to the `orchestration.ProcessingControl` table. The solution will automatically create a table in the destination datablase and will perform a full load of the table during the next scheduled pipeline execution. Incremental synchornizations will continue in the future. + +* As previously discussed, common schema evolution scenarios (i.e., columns being added or deleted) are handled automatically. However, other potential scenarios, such as data type changes will need to be handled manually. Consider the following approaches: + * Manually alter the definition of the table in the target SQL Database (recommended) + * Drop the table in the target SQL Database and delete all records in the orchestration.ProcessingLog table related to the affected table. The table will be re-added and fully loaded with data during the next scheduled synchronization. + +* This solution accelerator is designed to synchronize all columns from tables enabled for synchronization. In some scenarios, you may prefer to sync only a subset of columns from a given table (which may also improve the efficiency of the synchronization process). To accomplish that you may: + 1. Create a new Serverless SQL Database (within the same Synapse Analytics Workspace as the Lake Database created by Synapse Link for Dataverse). + 2. Create a set of cross-database views pointed to the tables in the Lake Database managed by Synapse Link for Dataverse. + 3. As part of view definitions, include the columns that you are interested in and omit the ones that are not relevant. Note, the following columns are required for incrementally-updated tables: *Id, IsDeleted, SinkModifiedOn and versionnumber*. + 4. The Linked Service in the Synapse Pipeline created by the solution accelerator will need to be pointed to the Serverless SQL Database that you have created (rather than the Lake Database created by Synapse Link for Dataverse). + +* If desired, consider hardening security settings of the implemented solution, which may include: + * Applying more restrictive firewall rules to the Azure SQL Server hosting your target database + * Granting more restrictive database permissions to the managed identity of your Synapse Analytics Workspace (as compared to the db_owner role suggested above) + + +## Acknowledgements +* Author: [Slava Trofimov](https://github.com/slavatrofimov) +* Special thanks to [Scott Sewell](https://github.com/mscottsewell) for solution testing and validation. \ No newline at end of file diff --git a/community templates/documentation/DataverseToAzureSQLDB/Images/ConfigureLinkedServices-detail.png b/community templates/documentation/DataverseToAzureSQLDB/Images/ConfigureLinkedServices-detail.png new file mode 100644 index 00000000..42e37fc2 Binary files /dev/null and b/community templates/documentation/DataverseToAzureSQLDB/Images/ConfigureLinkedServices-detail.png differ diff --git a/community templates/documentation/DataverseToAzureSQLDB/Images/ConfigureLinkedServices.png b/community templates/documentation/DataverseToAzureSQLDB/Images/ConfigureLinkedServices.png new file mode 100644 index 00000000..f789cf77 Binary files /dev/null and b/community templates/documentation/DataverseToAzureSQLDB/Images/ConfigureLinkedServices.png differ diff --git a/community templates/documentation/DataverseToAzureSQLDB/Images/ImportPipelineFromTemplate.png b/community templates/documentation/DataverseToAzureSQLDB/Images/ImportPipelineFromTemplate.png new file mode 100644 index 00000000..c79a2f24 Binary files /dev/null and b/community templates/documentation/DataverseToAzureSQLDB/Images/ImportPipelineFromTemplate.png differ diff --git a/community templates/documentation/DataverseToAzureSQLDB/Images/ImportedPipelines.png b/community templates/documentation/DataverseToAzureSQLDB/Images/ImportedPipelines.png new file mode 100644 index 00000000..38e7714a Binary files /dev/null and b/community templates/documentation/DataverseToAzureSQLDB/Images/ImportedPipelines.png differ diff --git a/community templates/documentation/DataverseToAzureSQLDB/Images/MonitorPipelineRuns.png b/community templates/documentation/DataverseToAzureSQLDB/Images/MonitorPipelineRuns.png new file mode 100644 index 00000000..6acd7ee8 Binary files /dev/null and b/community templates/documentation/DataverseToAzureSQLDB/Images/MonitorPipelineRuns.png differ diff --git a/community templates/documentation/DataverseToAzureSQLDB/Images/SolutionArchitecture.png b/community templates/documentation/DataverseToAzureSQLDB/Images/SolutionArchitecture.png new file mode 100644 index 00000000..c7a44089 Binary files /dev/null and b/community templates/documentation/DataverseToAzureSQLDB/Images/SolutionArchitecture.png differ diff --git a/community templates/documentation/DataverseToAzureSQLDB/Images/TriggerNow.png b/community templates/documentation/DataverseToAzureSQLDB/Images/TriggerNow.png new file mode 100644 index 00000000..399d562a Binary files /dev/null and b/community templates/documentation/DataverseToAzureSQLDB/Images/TriggerNow.png differ