diff --git a/AzureTable/Microsoft.DataTransfer.AzureTable.FunctionalTests/AzureTableDataSourceAdapterTests.cs b/AzureTable/Microsoft.DataTransfer.AzureTable.FunctionalTests/AzureTableDataSourceAdapterTests.cs index ab1822e..518e2ed 100644 --- a/AzureTable/Microsoft.DataTransfer.AzureTable.FunctionalTests/AzureTableDataSourceAdapterTests.cs +++ b/AzureTable/Microsoft.DataTransfer.AzureTable.FunctionalTests/AzureTableDataSourceAdapterTests.cs @@ -1,4 +1,5 @@ -using Microsoft.DataTransfer.AzureTable.Source; +using Microsoft.DataTransfer.AzureTable.Resumption; +using Microsoft.DataTransfer.AzureTable.Source; using Microsoft.DataTransfer.Extensibility; using Microsoft.DataTransfer.TestsCommon; using Microsoft.DataTransfer.TestsCommon.Mocks; @@ -114,6 +115,46 @@ public async Task ReadEntitiesWithAllInternalFields_AllInternalPropertiesRead() await ReadAndVerifyFields(configuration, new[] { "RowKey", "PartitionKey", "Timestamp" }); } + [TestMethod, Timeout(120000)] + public async Task ResumeFunctionality_ReadFromTheCheckpoint() + { + var configuration = Mocks + .Of(c => + c.ConnectionString == Settings.AzureStorageConnectionString && + c.Table == tableName && + c.InternalFields == AzureTableInternalFields.None) + .First(); + + var checkpointItem = sampleData[100]; + var resumptionAdapter = new AzureTableResumptionAdaptor("checkpoint.json"); + var checkpoint = new AzureTablePrimaryKey() + { + PartitionKey = "", + RowKey = checkpointItem["id"].ToString() + }; + resumptionAdapter.SaveCheckpoint(checkpoint); + + var dataContext = new AzureTableDataTransferContextMock() + { + SinkName = "TestSink", + SourceName = "TestSource", + EnableResumeFunction = true, + RunConfigSignature = "checkpoint" + }; + + using (var adapter = await new AzureTableSourceAdapterFactory() + .CreateAsync(configuration, dataContext, CancellationToken.None)) + { + var readOutput = new ReadOutputByRef(); + var dataItem = await adapter.ReadNextAsync(readOutput, CancellationToken.None); + + foreach(var field in dataItem.GetFieldNames()) + { + Assert.AreEqual(checkpointItem[field].ToString(), dataItem.GetValue(field).ToString()); + } + } + } + private async Task> ReadData(IAzureTableSourceAdapterConfiguration configuration) { using (var adapter = await new AzureTableSourceAdapterFactory() diff --git a/AzureTable/Microsoft.DataTransfer.AzureTable.FunctionalTests/AzureTableDataTransferContextMock.cs b/AzureTable/Microsoft.DataTransfer.AzureTable.FunctionalTests/AzureTableDataTransferContextMock.cs new file mode 100644 index 0000000..fe8ddb8 --- /dev/null +++ b/AzureTable/Microsoft.DataTransfer.AzureTable.FunctionalTests/AzureTableDataTransferContextMock.cs @@ -0,0 +1,15 @@ +using Microsoft.DataTransfer.Extensibility; + +namespace Microsoft.DataTransfer.AzureTable.FunctionalTests +{ + sealed class AzureTableDataTransferContextMock : IDataTransferContext + { + public string SourceName { get; set; } + + public string SinkName { get; set; } + + public string RunConfigSignature { get; set; } + + public bool EnableResumeFunction { get; set; } + } +} diff --git a/AzureTable/Microsoft.DataTransfer.AzureTable.FunctionalTests/AzureTableHelper.cs b/AzureTable/Microsoft.DataTransfer.AzureTable.FunctionalTests/AzureTableHelper.cs index 22668b5..9198144 100644 --- a/AzureTable/Microsoft.DataTransfer.AzureTable.FunctionalTests/AzureTableHelper.cs +++ b/AzureTable/Microsoft.DataTransfer.AzureTable.FunctionalTests/AzureTableHelper.cs @@ -15,7 +15,7 @@ public static void CreateTable(string connectionString, string tableName, IReadO TableBatchOperation batch = new TableBatchOperation(); foreach (var entity in data) { - batch.Insert(new DictionaryTableEntity(Guid.NewGuid().ToString(), entity)); + batch.Insert(new DictionaryTableEntity(entity["id"].ToString(), entity)); if (batch.Count >= 100) { diff --git a/AzureTable/Microsoft.DataTransfer.AzureTable.FunctionalTests/Microsoft.DataTransfer.AzureTable.FunctionalTests.csproj b/AzureTable/Microsoft.DataTransfer.AzureTable.FunctionalTests/Microsoft.DataTransfer.AzureTable.FunctionalTests.csproj index 9137ac8..c59ec66 100644 --- a/AzureTable/Microsoft.DataTransfer.AzureTable.FunctionalTests/Microsoft.DataTransfer.AzureTable.FunctionalTests.csproj +++ b/AzureTable/Microsoft.DataTransfer.AzureTable.FunctionalTests/Microsoft.DataTransfer.AzureTable.FunctionalTests.csproj @@ -116,6 +116,7 @@ Properties\CommonAssemblyInfo.cs + diff --git a/AzureTable/Microsoft.DataTransfer.AzureTable.UnitTests/ContinuationTokenParserTests.cs b/AzureTable/Microsoft.DataTransfer.AzureTable.UnitTests/ContinuationTokenParserTests.cs new file mode 100644 index 0000000..1793ee5 --- /dev/null +++ b/AzureTable/Microsoft.DataTransfer.AzureTable.UnitTests/ContinuationTokenParserTests.cs @@ -0,0 +1,18 @@ +using System; +using Microsoft.VisualStudio.TestTools.UnitTesting; + +namespace Microsoft.DataTransfer.AzureTable.UnitTests +{ + [TestClass] + public class ContinuationTokenParserTests + { + [TestMethod] + public void EncodeContinuationToken_StringEncoded() + { + var testString = "test"; + var encodedToken = ContinuationTokenParser.EncodeContinuationToken(testString); + + Assert.AreEqual("1!8!dGVzdA--", encodedToken, "The encoded token should be as expected."); + } + } +} diff --git a/AzureTable/Microsoft.DataTransfer.AzureTable.UnitTests/Microsoft.DataTransfer.AzureTable.UnitTests.csproj b/AzureTable/Microsoft.DataTransfer.AzureTable.UnitTests/Microsoft.DataTransfer.AzureTable.UnitTests.csproj new file mode 100644 index 0000000..100e2a2 --- /dev/null +++ b/AzureTable/Microsoft.DataTransfer.AzureTable.UnitTests/Microsoft.DataTransfer.AzureTable.UnitTests.csproj @@ -0,0 +1,96 @@ + + + + Debug + AnyCPU + {428B3874-8780-4939-AC5A-79C3980E01C7} + Library + Properties + Microsoft.DataTransfer.AzureTable.UnitTests + Microsoft.DataTransfer.AzureTable.UnitTests + v4.5.2 + 512 + {3AC096D0-A1C2-E12C-1390-A8335801FDAB};{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC} + 15.0 + $(MSBuildExtensionsPath32)\Microsoft\VisualStudio\v$(VisualStudioVersion) + $(ProgramFiles)\Common Files\microsoft shared\VSTT\$(VisualStudioVersion)\UITestExtensionPackages + False + UnitTest + + + + + true + full + false + bin\Debug\ + DEBUG;TRACE + prompt + 4 + + + pdbonly + true + bin\Release\ + TRACE + prompt + 4 + + + + ..\..\packages\Castle.Core.3.3.3\lib\net45\Castle.Core.dll + True + + + ..\..\packages\Microsoft.Azure.DocumentDB.2.2.1\lib\net45\Microsoft.Azure.Documents.Client.dll + + + ..\..\packages\Moq.4.5.21\lib\net45\Moq.dll + True + + + ..\..\packages\Newtonsoft.Json.10.0.2\lib\net45\Newtonsoft.Json.dll + True + + + + + + + + + + + + + + + + + + + Properties\CommonAssemblyInfo.cs + + + + + + + + + + {366ba489-e851-4899-9ba3-2f9c7599d24b} + Microsoft.DataTransfer.AzureTable + + + + + + + This project references NuGet package(s) that are missing on this computer. Use NuGet Package Restore to download them. For more information, see http://go.microsoft.com/fwlink/?LinkID=322105. The missing file is {0}. + + + + + + \ No newline at end of file diff --git a/AzureTable/Microsoft.DataTransfer.AzureTable.UnitTests/Properties/AssemblyInfo.cs b/AzureTable/Microsoft.DataTransfer.AzureTable.UnitTests/Properties/AssemblyInfo.cs new file mode 100644 index 0000000..d46d5e1 --- /dev/null +++ b/AzureTable/Microsoft.DataTransfer.AzureTable.UnitTests/Properties/AssemblyInfo.cs @@ -0,0 +1,10 @@ +using System.Reflection; +using System.Runtime.CompilerServices; +using System.Runtime.InteropServices; + +[assembly: AssemblyTitle("Microsoft.DataTransfer.AzureTable.UnitTests")] +[assembly: AssemblyDescription("")] +[assembly: AssemblyProduct("Microsoft.DataTransfer.AzureTable.UnitTests")] +[assembly: AssemblyCulture("")] + +[assembly: Guid("428b3874-8780-4939-ac5a-79c3980e01c7")] diff --git a/AzureTable/Microsoft.DataTransfer.AzureTable.UnitTests/packages.config b/AzureTable/Microsoft.DataTransfer.AzureTable.UnitTests/packages.config new file mode 100644 index 0000000..bcf1b2a --- /dev/null +++ b/AzureTable/Microsoft.DataTransfer.AzureTable.UnitTests/packages.config @@ -0,0 +1,7 @@ + + + + + + + \ No newline at end of file diff --git a/AzureTable/Microsoft.DataTransfer.AzureTable/ContinuationTokenParser.cs b/AzureTable/Microsoft.DataTransfer.AzureTable/ContinuationTokenParser.cs new file mode 100644 index 0000000..8f0bbea --- /dev/null +++ b/AzureTable/Microsoft.DataTransfer.AzureTable/ContinuationTokenParser.cs @@ -0,0 +1,50 @@ +using System; +using System.Text; + +namespace Microsoft.DataTransfer.AzureTable +{ + /// + /// The class which is used to encode the continuation token + /// + public static class ContinuationTokenParser + { + private const char ExclamationDelimiter = '!'; + + /// + /// Generates the encoded continuation token with fomart (Version)!(TokenLength)!(CustomBase64EncodedToken) + /// + /// The string that you want to encode into continuation token + /// The encoded continuation token + public static string EncodeContinuationToken(string key) + { + StringBuilder encodedContinuationToken = new StringBuilder(); + // Version of the ContinuationToken + encodedContinuationToken.Append(1); + encodedContinuationToken.Append(ExclamationDelimiter); + + string base64EncodedToken = Convert.ToBase64String(Encoding.UTF8.GetBytes(key.ToString())); + + // Size is the lenght of base64 encoded key + encodedContinuationToken.Append(base64EncodedToken.Length); + encodedContinuationToken.Append(ExclamationDelimiter); + + foreach (char c in base64EncodedToken.ToCharArray()) + { + encodedContinuationToken.Append(TranslateChar(c)); + } + + return encodedContinuationToken.ToString(); + } + + private static char TranslateChar(char c) + { + switch (c) + { + case '/': return '_'; + case '+': return '*'; + case '=': return '-'; + default: return c; + } + } + } +} diff --git a/AzureTable/Microsoft.DataTransfer.AzureTable/Microsoft.DataTransfer.AzureTable.csproj b/AzureTable/Microsoft.DataTransfer.AzureTable/Microsoft.DataTransfer.AzureTable.csproj index 6eefa9c..bb09a51 100644 --- a/AzureTable/Microsoft.DataTransfer.AzureTable/Microsoft.DataTransfer.AzureTable.csproj +++ b/AzureTable/Microsoft.DataTransfer.AzureTable/Microsoft.DataTransfer.AzureTable.csproj @@ -101,6 +101,7 @@ True ConfigurationResources.resx + @@ -111,6 +112,8 @@ True Resources.resx + + diff --git a/AzureTable/Microsoft.DataTransfer.AzureTable/Resumption/AzureTablePrimaryKey.cs b/AzureTable/Microsoft.DataTransfer.AzureTable/Resumption/AzureTablePrimaryKey.cs new file mode 100644 index 0000000..2776b51 --- /dev/null +++ b/AzureTable/Microsoft.DataTransfer.AzureTable/Resumption/AzureTablePrimaryKey.cs @@ -0,0 +1,24 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Microsoft.DataTransfer.AzureTable.Resumption +{ + /// + /// Define the checkpoint for Azure Table data transfer + /// + public class AzureTablePrimaryKey + { + /// + /// The partition key of the checkpoint + /// + public string PartitionKey { get; set; } + + /// + /// The row key of the checkpoint + /// + public string RowKey { get; set; } + } +} diff --git a/AzureTable/Microsoft.DataTransfer.AzureTable/Resumption/AzureTableResumptionAdaptor.cs b/AzureTable/Microsoft.DataTransfer.AzureTable/Resumption/AzureTableResumptionAdaptor.cs new file mode 100644 index 0000000..46cc2a4 --- /dev/null +++ b/AzureTable/Microsoft.DataTransfer.AzureTable/Resumption/AzureTableResumptionAdaptor.cs @@ -0,0 +1,74 @@ +using Microsoft.DataTransfer.Basics; +using Microsoft.DataTransfer.Extensibility; +using Newtonsoft.Json; +using System; +using System.IO; + +namespace Microsoft.DataTransfer.AzureTable.Resumption +{ + /// + /// Adaptor for the resume functionality for data transfer between Azure Table Storage + /// + public class AzureTableResumptionAdaptor : IDataTransferResumptionAdapter + { + private readonly string _fileFullPath; + private const string _folderName = "ResumeCheckpoint"; + + /// + /// Create an instance of + /// + /// The name of the checkpoint file + public AzureTableResumptionAdaptor(string fileName) + { + Guard.NotEmpty(nameof(fileName), fileName); + + if (fileName.IndexOfAny(Path.GetInvalidFileNameChars()) >= 0) + { + throw new ArgumentException("File name contains invalid characters."); + } + + Directory.CreateDirectory(_folderName); + _fileFullPath = Path.Combine(_folderName, fileName); + } + + /// + /// Get the checkpoint from the file + /// + /// The checkpoint + public AzureTablePrimaryKey GetCheckpoint() + { + if (File.Exists(_fileFullPath)) + { + return JsonConvert.DeserializeObject(File.ReadAllText(_fileFullPath)); + } + + return null; + } + + /// + /// Save the checkpoint to the file + /// + /// The checkpoint to store + public void SaveCheckpoint(AzureTablePrimaryKey checkpoint) + { + Guard.NotNull(nameof(checkpoint), checkpoint); + + using (StreamWriter file = File.CreateText(_fileFullPath)) + { + JsonSerializer serializer = new JsonSerializer(); + serializer.Serialize(file, checkpoint); + } + } + + /// + /// Delete the file which stores the checkpoint + /// + public void DeleteCheckpoint() + { + if (File.Exists(_fileFullPath)) + { + File.Delete(_fileFullPath); + } + } + } +} diff --git a/AzureTable/Microsoft.DataTransfer.AzureTable/Sink/TableAPIBulkSinkAdapter.cs b/AzureTable/Microsoft.DataTransfer.AzureTable/Sink/TableAPIBulkSinkAdapter.cs index 69a8b2b..d313006 100644 --- a/AzureTable/Microsoft.DataTransfer.AzureTable/Sink/TableAPIBulkSinkAdapter.cs +++ b/AzureTable/Microsoft.DataTransfer.AzureTable/Sink/TableAPIBulkSinkAdapter.cs @@ -4,6 +4,7 @@ using Microsoft.Azure.CosmosDB.Table; using Microsoft.Azure.Storage; using Microsoft.Azure.Storage.RetryPolicies; + using Microsoft.DataTransfer.AzureTable.Resumption; using Microsoft.DataTransfer.AzureTable.Sink.Bulk; using Microsoft.DataTransfer.AzureTable.Source; using Microsoft.DataTransfer.AzureTable.Utils; @@ -26,6 +27,7 @@ internal sealed class TableAPIBulkSinkAdapter : IDataSinkAdapter private readonly long _maxInputBufferSizeInBytes; private readonly int _throughput; private readonly int _maxLengthInBytesPerBatch; + private readonly IDataTransferResumptionAdapter _resumptionAdapter; private CloudTable cloudtable; private ConcurrentDictionary dict; @@ -39,7 +41,8 @@ public int MaxDegreeOfParallelism } public TableAPIBulkSinkAdapter(string connectionString, string tableName, - bool overwrite, long maxInputBufferSizeInBytes, int throughput, int batchSize) + bool overwrite, long maxInputBufferSizeInBytes, int throughput, int batchSize, + IDataTransferResumptionAdapter resumptionAdapter) { _connectionString = connectionString; _tableName = tableName; @@ -47,6 +50,7 @@ public TableAPIBulkSinkAdapter(string connectionString, string tableName, _maxInputBufferSizeInBytes = maxInputBufferSizeInBytes; _throughput = throughput; _maxLengthInBytesPerBatch = batchSize; + _resumptionAdapter = resumptionAdapter; CloudStorageAccount storageAccount = CloudStorageAccount.Parse(_connectionString); @@ -75,6 +79,11 @@ public async Task InitializeAsync(CancellationToken cancellation) public async Task WriteAsync(IDataItem dataItem, CancellationToken cancellation) { var item = GetITableEntityFromIDataItem(dataItem); + if (dict.Count == 0 && !cancellation.IsCancellationRequested) + { + _resumptionAdapter?.SaveCheckpoint( + new AzureTablePrimaryKey { PartitionKey = item.PartitionKey, RowKey = item.RowKey }); + } inputSizeTracker.Add(item); diff --git a/AzureTable/Microsoft.DataTransfer.AzureTable/Sink/TableAPIBulkSinkAdapterFactory.cs b/AzureTable/Microsoft.DataTransfer.AzureTable/Sink/TableAPIBulkSinkAdapterFactory.cs index a2ae142..1c6eb61 100644 --- a/AzureTable/Microsoft.DataTransfer.AzureTable/Sink/TableAPIBulkSinkAdapterFactory.cs +++ b/AzureTable/Microsoft.DataTransfer.AzureTable/Sink/TableAPIBulkSinkAdapterFactory.cs @@ -1,6 +1,7 @@ namespace Microsoft.DataTransfer.TableAPI.Sink.Bulk { using Microsoft.DataTransfer.AzureTable; + using Microsoft.DataTransfer.AzureTable.Resumption; using Microsoft.DataTransfer.Basics; using Microsoft.DataTransfer.Extensibility; using Microsoft.DataTransfer.Extensibility.Basics; @@ -61,7 +62,8 @@ public async Task CreateAsync(ITableAPIBulkSinkAdapterConfigur var sink = new TableAPIBulkSinkAdapter(configuration.ConnectionString, configuration.TableName, configuration.Overwrite, - maxInputBufferSizeInBytes, throughput, batchSize); + maxInputBufferSizeInBytes, throughput, batchSize, + context.EnableResumeFunction ? new AzureTableResumptionAdaptor(context.RunConfigSignature + ".json") : null); await sink.InitializeAsync(cancellation); diff --git a/AzureTable/Microsoft.DataTransfer.AzureTable/Source/AzureTableSourceAdapter.cs b/AzureTable/Microsoft.DataTransfer.AzureTable/Source/AzureTableSourceAdapter.cs index a3fb306..e915a9a 100644 --- a/AzureTable/Microsoft.DataTransfer.AzureTable/Source/AzureTableSourceAdapter.cs +++ b/AzureTable/Microsoft.DataTransfer.AzureTable/Source/AzureTableSourceAdapter.cs @@ -6,6 +6,7 @@ using Microsoft.Azure.Storage; using Microsoft.Azure.Storage.RetryPolicies; using Microsoft.DataTransfer.AzureTable.Client; +using Microsoft.DataTransfer.AzureTable.Resumption; using Microsoft.DataTransfer.Extensibility; namespace Microsoft.DataTransfer.AzureTable.Source @@ -20,12 +21,14 @@ sealed class AzureTableSourceAdapter : IDataSourceAdapter private readonly IAzureTableSourceAdapterInstanceConfiguration configuration; private readonly CloudTable table; private readonly TableQuery query; + private readonly IDataTransferResumptionAdapter _resumptionAdapter; private readonly TableRequestOptions requestOptions; private Task> segmentDownloadTask; private int currentEntityIndex; - public AzureTableSourceAdapter(IAzureTableSourceAdapterInstanceConfiguration configuration) + public AzureTableSourceAdapter(IAzureTableSourceAdapterInstanceConfiguration configuration, + IDataTransferResumptionAdapter resumptionAdapter) { this.configuration = configuration; @@ -44,6 +47,7 @@ public AzureTableSourceAdapter(IAzureTableSourceAdapterInstanceConfiguration con FilterString = configuration.Filter, SelectColumns = configuration.Projection == null ? null : new List(configuration.Projection) }; + _resumptionAdapter = resumptionAdapter; requestOptions = new TableRequestOptions() { @@ -55,7 +59,18 @@ public async Task ReadNextAsync(ReadOutputByRef readOutput, Cancellat { if (segmentDownloadTask == null) { - MoveToNextSegment(null, cancellation); + TableContinuationToken continuationToken = null; + var checkpoint = _resumptionAdapter?.GetCheckpoint(); + if (checkpoint != null) + { + continuationToken = new TableContinuationToken + { + NextPartitionKey = ContinuationTokenParser.EncodeContinuationToken(checkpoint.PartitionKey), + NextRowKey = ContinuationTokenParser.EncodeContinuationToken(checkpoint.RowKey) + }; + } + + MoveToNextSegment(continuationToken, cancellation); } var currentSegment = await segmentDownloadTask; diff --git a/AzureTable/Microsoft.DataTransfer.AzureTable/Source/AzureTableSourceAdapterFactory.cs b/AzureTable/Microsoft.DataTransfer.AzureTable/Source/AzureTableSourceAdapterFactory.cs index 26e7f45..f1d600e 100644 --- a/AzureTable/Microsoft.DataTransfer.AzureTable/Source/AzureTableSourceAdapterFactory.cs +++ b/AzureTable/Microsoft.DataTransfer.AzureTable/Source/AzureTableSourceAdapterFactory.cs @@ -1,4 +1,5 @@ -using Microsoft.DataTransfer.Basics; +using Microsoft.DataTransfer.AzureTable.Resumption; +using Microsoft.DataTransfer.Basics; using Microsoft.DataTransfer.Extensibility; using Microsoft.DataTransfer.Extensibility.Basics; using System; @@ -29,17 +30,18 @@ public string Description /// Task that represents asynchronous create operation. public Task CreateAsync(IAzureTableSourceAdapterConfiguration configuration, IDataTransferContext context, CancellationToken cancellation) { - return Task.Factory.StartNew(() => Create(configuration), cancellation); + return Task.Factory.StartNew(() => Create(configuration, context), cancellation); } - private static IDataSourceAdapter Create(IAzureTableSourceAdapterConfiguration configuration) + private static IDataSourceAdapter Create(IAzureTableSourceAdapterConfiguration configuration, IDataTransferContext context) { Guard.NotNull("configuration", configuration); if (String.IsNullOrEmpty(configuration.ConnectionString)) throw Errors.ConnectionStringMissing(); - return new AzureTableSourceAdapter(CreateInstanceConfiguration(configuration)); + return new AzureTableSourceAdapter(CreateInstanceConfiguration(configuration), + context.EnableResumeFunction ? new AzureTableResumptionAdaptor(context.RunConfigSignature + ".json") : null); } private static IAzureTableSourceAdapterInstanceConfiguration CreateInstanceConfiguration(IAzureTableSourceAdapterConfiguration configuration) diff --git a/Console/Microsoft.DataTransfer.ConsoleHost/App/Handlers/OneTimeDataTransferHandler.cs b/Console/Microsoft.DataTransfer.ConsoleHost/App/Handlers/OneTimeDataTransferHandler.cs index 5d3ad0a..74cec9b 100644 --- a/Console/Microsoft.DataTransfer.ConsoleHost/App/Handlers/OneTimeDataTransferHandler.cs +++ b/Console/Microsoft.DataTransfer.ConsoleHost/App/Handlers/OneTimeDataTransferHandler.cs @@ -20,7 +20,8 @@ sealed class OneTimeDataTransferHandler : ITransferHandler private readonly IOneTimeDataTransferConfiguration configuration; public OneTimeDataTransferHandler(IDataTransferService transferService, IDataAdapterConfigurationFactory dataAdapterConfiguration, - ITransferStatisticsHandler statisticsHandler, ITransferStatisticsConfiguration statisticsConfiguration, IOneTimeDataTransferConfiguration configuration) + ITransferStatisticsHandler statisticsHandler, ITransferStatisticsConfiguration statisticsConfiguration, + IOneTimeDataTransferConfiguration configuration) { this.transferService = transferService; this.dataAdapterConfiguration = dataAdapterConfiguration; @@ -59,7 +60,8 @@ await transferService // With statistics statistics, // Allow cancellation - cancellation.Token); + cancellation.Token, + statisticsConfiguration.EnableResumption); } } diff --git a/Console/Microsoft.DataTransfer.ConsoleHost/Configuration/IInfrastructureConfiguration.cs b/Console/Microsoft.DataTransfer.ConsoleHost/Configuration/IInfrastructureConfiguration.cs index 03f396b..59c6640 100644 --- a/Console/Microsoft.DataTransfer.ConsoleHost/Configuration/IInfrastructureConfiguration.cs +++ b/Console/Microsoft.DataTransfer.ConsoleHost/Configuration/IInfrastructureConfiguration.cs @@ -1,4 +1,5 @@ -using Microsoft.DataTransfer.ServiceModel.Errors; +using Microsoft.DataTransfer.ServiceModel; +using Microsoft.DataTransfer.ServiceModel.Errors; using Microsoft.DataTransfer.ServiceModel.Statistics; namespace Microsoft.DataTransfer.ConsoleHost.Configuration @@ -9,5 +10,7 @@ namespace Microsoft.DataTransfer.ConsoleHost.Configuration /// /// This needs to be public to allow automatic proxy class generation. /// - public interface IInfrastructureConfiguration : ITransferStatisticsConfiguration, IErrorDetailsConfiguration { } + public interface IInfrastructureConfiguration : + ITransferStatisticsConfiguration, IErrorDetailsConfiguration + { } } diff --git a/Core/Microsoft.DataTransfer.Core/Errors.cs b/Core/Microsoft.DataTransfer.Core/Errors.cs index 741b123..35eec05 100644 --- a/Core/Microsoft.DataTransfer.Core/Errors.cs +++ b/Core/Microsoft.DataTransfer.Core/Errors.cs @@ -34,5 +34,10 @@ public static Exception NonGenericDataAdapterFactoryType(Type type) { return new InvalidOperationException(FormatMessage(Resources.NonGenericDataAdapterFactoryTypeFormat, type)); } + + public static Exception UnsupportedDataSourceOrSinkForResumption(string name) + { + return new NotSupportedException(FormatMessage(Resources.UnsupportedDataSourceOrSinkForResumptionFormat, name)); + } } } diff --git a/Core/Microsoft.DataTransfer.Core/Microsoft.DataTransfer.Core.csproj b/Core/Microsoft.DataTransfer.Core/Microsoft.DataTransfer.Core.csproj index c1c6e41..000c435 100644 --- a/Core/Microsoft.DataTransfer.Core/Microsoft.DataTransfer.Core.csproj +++ b/Core/Microsoft.DataTransfer.Core/Microsoft.DataTransfer.Core.csproj @@ -75,6 +75,10 @@ + + ..\..\packages\Newtonsoft.Json.10.0.2\lib\net45\Newtonsoft.Json.dll + True + ..\..\packages\System.Spatial.5.8.2\lib\net40\System.Spatial.dll diff --git a/Core/Microsoft.DataTransfer.Core/Resources.Designer.cs b/Core/Microsoft.DataTransfer.Core/Resources.Designer.cs index 4008b80..f835b1f 100644 --- a/Core/Microsoft.DataTransfer.Core/Resources.Designer.cs +++ b/Core/Microsoft.DataTransfer.Core/Resources.Designer.cs @@ -1,7 +1,7 @@ //------------------------------------------------------------------------------ // // This code was generated by a tool. -// Runtime Version:4.0.30319.34209 +// Runtime Version:4.0.30319.42000 // // Changes to this file may cause incorrect behavior and will be lost if // the code is regenerated. @@ -19,7 +19,7 @@ namespace Microsoft.DataTransfer.Core { // class via a tool like ResGen or Visual Studio. // To add or remove a member, edit your .ResX file then rerun ResGen // with the /str option, or rebuild your VS project. - [global::System.CodeDom.Compiler.GeneratedCodeAttribute("System.Resources.Tools.StronglyTypedResourceBuilder", "4.0.0.0")] + [global::System.CodeDom.Compiler.GeneratedCodeAttribute("System.Resources.Tools.StronglyTypedResourceBuilder", "15.0.0.0")] [global::System.Diagnostics.DebuggerNonUserCodeAttribute()] [global::System.Runtime.CompilerServices.CompilerGeneratedAttribute()] internal class Resources { @@ -104,5 +104,14 @@ internal static string UnknownDataSourceFormat { return ResourceManager.GetString("UnknownDataSourceFormat", resourceCulture); } } + + /// + /// Looks up a localized string similar to Data sink or source '{0}' is not supported for data transfer resumption. + /// + internal static string UnsupportedDataSourceOrSinkForResumptionFormat { + get { + return ResourceManager.GetString("UnsupportedDataSourceOrSinkForResumptionFormat", resourceCulture); + } + } } } diff --git a/Core/Microsoft.DataTransfer.Core/Resources.resx b/Core/Microsoft.DataTransfer.Core/Resources.resx index 79da0e4..6a2c461 100644 --- a/Core/Microsoft.DataTransfer.Core/Resources.resx +++ b/Core/Microsoft.DataTransfer.Core/Resources.resx @@ -132,4 +132,7 @@ Data source '{0}' is not known + + Data sink or source '{0}' is not supported for data transfer resumption + \ No newline at end of file diff --git a/Core/Microsoft.DataTransfer.Core/Service/DataTransferContext.cs b/Core/Microsoft.DataTransfer.Core/Service/DataTransferContext.cs index 4ed37c2..0c98589 100644 --- a/Core/Microsoft.DataTransfer.Core/Service/DataTransferContext.cs +++ b/Core/Microsoft.DataTransfer.Core/Service/DataTransferContext.cs @@ -6,5 +6,7 @@ sealed class DataTransferContext : IDataTransferContext { public string SourceName { get; set; } public string SinkName { get; set; } + public string RunConfigSignature { get; set; } + public bool EnableResumeFunction { get; set; } } } diff --git a/Core/Microsoft.DataTransfer.Core/Service/DataTransferService.cs b/Core/Microsoft.DataTransfer.Core/Service/DataTransferService.cs index 137a96a..05a57b2 100644 --- a/Core/Microsoft.DataTransfer.Core/Service/DataTransferService.cs +++ b/Core/Microsoft.DataTransfer.Core/Service/DataTransferService.cs @@ -3,6 +3,8 @@ using Microsoft.DataTransfer.ServiceModel; using Microsoft.DataTransfer.ServiceModel.Entities; using Microsoft.DataTransfer.ServiceModel.Statistics; +using Newtonsoft.Json; +using System; using System.Collections.Generic; using System.Linq; using System.Threading; @@ -17,6 +19,12 @@ sealed class DataTransferService : IDataTransferService private IReadOnlyDictionary sources; private IReadOnlyDictionary sinks; + private IReadOnlyList resumeFunctionSupportSourcesAndSinks = new List + { + "AzureTable", + "TableAPIBulk" + }; + public DataTransferService( IReadOnlyDictionary sources, IReadOnlyDictionary sinks, @@ -42,7 +50,8 @@ public IReadOnlyDictionary GetKnownSinks() } public async Task TransferAsync(string sourceName, object sourceConfiguration, - string sinkName, object sinkConfiguration, ITransferStatistics statistics, CancellationToken cancellation) + string sinkName, object sinkConfiguration, ITransferStatistics statistics, + CancellationToken cancellation, bool enableResumeFunction = false) { IDataSourceAdapterFactoryAdapter sourceFactoryAdapter; if (!sources.TryGetValue(sourceName, out sourceFactoryAdapter)) @@ -52,10 +61,23 @@ public async Task TransferAsync(string sourceName, object sourceConfiguration, if (!sinks.TryGetValue(sinkName, out sinkFactoryAdapter)) throw Errors.UnknownDataSink(sinkName); + if (enableResumeFunction) + { + if (!resumeFunctionSupportSourcesAndSinks.Contains(sourceName)) + throw Errors.UnsupportedDataSourceOrSinkForResumption(sourceName); + + if (!resumeFunctionSupportSourcesAndSinks.Contains(sinkName)) + throw Errors.UnsupportedDataSourceOrSinkForResumption(sinkName); + } + + var jsonSerilizer = new JsonSerializer(); var context = new DataTransferContext { SourceName = sourceName, - SinkName = sinkName + SinkName = sinkName, + RunConfigSignature = GetStringSha256Hash( + JsonConvert.SerializeObject(sourceConfiguration) + JsonConvert.SerializeObject(sinkConfiguration)), + EnableResumeFunction = enableResumeFunction }; try @@ -75,5 +97,18 @@ public async Task TransferAsync(string sourceName, object sourceConfiguration, statistics.Stop(); } } + + private static string GetStringSha256Hash(string text) + { + if (string.IsNullOrEmpty(text)) + return string.Empty; + + using (var sha = new System.Security.Cryptography.SHA256Managed()) + { + byte[] textData = System.Text.Encoding.UTF8.GetBytes(text); + byte[] hash = sha.ComputeHash(textData); + return BitConverter.ToString(hash).Replace("-", string.Empty); + } + } } } diff --git a/Core/Microsoft.DataTransfer.Extensibility/IDataTransferContext.cs b/Core/Microsoft.DataTransfer.Extensibility/IDataTransferContext.cs index a371b77..1dd31fd 100644 --- a/Core/Microsoft.DataTransfer.Extensibility/IDataTransferContext.cs +++ b/Core/Microsoft.DataTransfer.Extensibility/IDataTransferContext.cs @@ -15,5 +15,15 @@ public interface IDataTransferContext /// Gets name of the data sink adapter. /// string SinkName { get; } + + /// + /// Gets the signature of the source and sink configuration + /// + string RunConfigSignature { get; } + + /// + /// Whether to enable the resume function + /// + bool EnableResumeFunction { get; } } } diff --git a/Core/Microsoft.DataTransfer.Extensibility/IDataTransferResumptionAdapter.cs b/Core/Microsoft.DataTransfer.Extensibility/IDataTransferResumptionAdapter.cs new file mode 100644 index 0000000..b45ba5d --- /dev/null +++ b/Core/Microsoft.DataTransfer.Extensibility/IDataTransferResumptionAdapter.cs @@ -0,0 +1,32 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace Microsoft.DataTransfer.Extensibility +{ + /// + /// The interface for the resumption adapters in data transfer + /// + /// The type of checkpoint that you want to create + public interface IDataTransferResumptionAdapter + { + /// + /// Save the resumption checkpoint + /// + /// The checkpoint that you want to save + void SaveCheckpoint(TCheckpoint checkpoint); + + /// + /// Get the resumption checkpoint + /// + /// The checkpoint that you saved in the last run of data transfer action + TCheckpoint GetCheckpoint(); + + /// + /// Cleanup the checkpoint file if needed + /// + void DeleteCheckpoint(); + } +} diff --git a/Core/Microsoft.DataTransfer.Extensibility/Microsoft.DataTransfer.Extensibility.csproj b/Core/Microsoft.DataTransfer.Extensibility/Microsoft.DataTransfer.Extensibility.csproj index 7279ca9..6f15d99 100644 --- a/Core/Microsoft.DataTransfer.Extensibility/Microsoft.DataTransfer.Extensibility.csproj +++ b/Core/Microsoft.DataTransfer.Extensibility/Microsoft.DataTransfer.Extensibility.csproj @@ -51,6 +51,7 @@ + diff --git a/Core/Microsoft.DataTransfer.ServiceModel/ConfigurationResources.Designer.cs b/Core/Microsoft.DataTransfer.ServiceModel/ConfigurationResources.Designer.cs index e470673..cb018b5 100644 --- a/Core/Microsoft.DataTransfer.ServiceModel/ConfigurationResources.Designer.cs +++ b/Core/Microsoft.DataTransfer.ServiceModel/ConfigurationResources.Designer.cs @@ -87,6 +87,15 @@ public static string Statistics_EnableCosmosTableLog { } } + /// + /// Looks up a localized string similar to Optional. Allows to save checkpoint files in folder ./ResumeCheckpoint and resume from there the next time you run with the same source and target configuration. + /// + public static string Statistics_EnableResumption { + get { + return ResourceManager.GetString("Statistics_EnableResumption", resourceCulture); + } + } + /// /// Looks up a localized string similar to Optional. Name of the CSV file to redirect data transfer failures. /// diff --git a/Core/Microsoft.DataTransfer.ServiceModel/ConfigurationResources.resx b/Core/Microsoft.DataTransfer.ServiceModel/ConfigurationResources.resx index feb52db..dadb6b2 100644 --- a/Core/Microsoft.DataTransfer.ServiceModel/ConfigurationResources.resx +++ b/Core/Microsoft.DataTransfer.ServiceModel/ConfigurationResources.resx @@ -135,4 +135,7 @@ Optional, default is {0}. Time interval to refresh on-screen data transfer progress + + Optional. Allows to save checkpoint files in folder ./ResumeCheckpoint and resume from there the next time you run with the same source and target configuration + \ No newline at end of file diff --git a/Core/Microsoft.DataTransfer.ServiceModel/IDataTransferService.cs b/Core/Microsoft.DataTransfer.ServiceModel/IDataTransferService.cs index 6512dd9..1a27bdd 100644 --- a/Core/Microsoft.DataTransfer.ServiceModel/IDataTransferService.cs +++ b/Core/Microsoft.DataTransfer.ServiceModel/IDataTransferService.cs @@ -31,12 +31,14 @@ public interface IDataTransferService /// Name of the target data adapter. /// Target data adapter configuration. /// Instance of to report data transfer progress to. + /// Whether to enable saving and resuming from the last checkpoint. /// Cancellation token. /// Task that represents asynchronous data transfer operation. Task TransferAsync( string sourceName, object sourceConfiguration, string sinkName, object sinkConfiguration, ITransferStatistics statistics, - CancellationToken cancellation); + CancellationToken cancellation, + bool enableResumeFunction = false); } } diff --git a/Core/Microsoft.DataTransfer.ServiceModel/Statistics/ITransferStatisticsConfiguration.cs b/Core/Microsoft.DataTransfer.ServiceModel/Statistics/ITransferStatisticsConfiguration.cs index 87661b1..4f22f7c 100644 --- a/Core/Microsoft.DataTransfer.ServiceModel/Statistics/ITransferStatisticsConfiguration.cs +++ b/Core/Microsoft.DataTransfer.ServiceModel/Statistics/ITransferStatisticsConfiguration.cs @@ -37,5 +37,11 @@ public interface ITransferStatisticsConfiguration /// [Display(ResourceType = typeof(DynamicConfigurationResources), Description = "Statistics_ProgressUpdateInterval")] TimeSpan? ProgressUpdateInterval { get; } + + /// + /// Whether to allow saving or resuming from a checkpoint + /// + [Display(ResourceType = typeof(ConfigurationResources), Description = "Statistics_EnableResumption")] + bool EnableResumption { get; } } } diff --git a/DataTransfer.sln b/DataTransfer.sln index ff274e0..869b9d2 100644 --- a/DataTransfer.sln +++ b/DataTransfer.sln @@ -150,6 +150,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Microsoft.DataTransfer.Basi EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Microsoft.DataTransfer.MongoDb.UnitTests", "MongoDb\Microsoft.DataTransfer.MongoDb.UnitTests\Microsoft.DataTransfer.MongoDb.UnitTests.csproj", "{2AC0E216-9ED4-4D1D-A264-73905A37F6AA}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Microsoft.DataTransfer.AzureTable.UnitTests", "AzureTable\Microsoft.DataTransfer.AzureTable.UnitTests\Microsoft.DataTransfer.AzureTable.UnitTests.csproj", "{428B3874-8780-4939-AC5A-79C3980E01C7}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|x64 = Debug|x64 @@ -356,18 +358,10 @@ Global {2AC0E216-9ED4-4D1D-A264-73905A37F6AA}.Debug|x64.Build.0 = Debug|x64 {2AC0E216-9ED4-4D1D-A264-73905A37F6AA}.Release|x64.ActiveCfg = Release|x64 {2AC0E216-9ED4-4D1D-A264-73905A37F6AA}.Release|x64.Build.0 = Release|x64 - {3C508C20-FC36-439A-AD68-5FE2170172D0}.Debug|x64.ActiveCfg = Debug|x64 - {3C508C20-FC36-439A-AD68-5FE2170172D0}.Debug|x64.Build.0 = Debug|x64 - {3C508C20-FC36-439A-AD68-5FE2170172D0}.Release|x64.ActiveCfg = Release|x64 - {3C508C20-FC36-439A-AD68-5FE2170172D0}.Release|x64.Build.0 = Release|x64 - {152DEF82-24B7-46B8-A6E4-09F90D5332B1}.Debug|x64.ActiveCfg = Debug|x64 - {152DEF82-24B7-46B8-A6E4-09F90D5332B1}.Debug|x64.Build.0 = Debug|x64 - {152DEF82-24B7-46B8-A6E4-09F90D5332B1}.Release|x64.ActiveCfg = Release|x64 - {152DEF82-24B7-46B8-A6E4-09F90D5332B1}.Release|x64.Build.0 = Release|x64 - {A5013A12-18EA-40B6-83E0-4A59DC1B9E74}.Debug|x64.ActiveCfg = Debug|x64 - {A5013A12-18EA-40B6-83E0-4A59DC1B9E74}.Debug|x64.Build.0 = Debug|x64 - {A5013A12-18EA-40B6-83E0-4A59DC1B9E74}.Release|x64.ActiveCfg = Release|x64 - {A5013A12-18EA-40B6-83E0-4A59DC1B9E74}.Release|x64.Build.0 = Release|x64 + {428B3874-8780-4939-AC5A-79C3980E01C7}.Debug|x64.ActiveCfg = Debug|x64 + {428B3874-8780-4939-AC5A-79C3980E01C7}.Debug|x64.Build.0 = Debug|x64 + {428B3874-8780-4939-AC5A-79C3980E01C7}.Release|x64.ActiveCfg = Release|x64 + {428B3874-8780-4939-AC5A-79C3980E01C7}.Release|x64.Build.0 = Release|x64 EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -423,9 +417,10 @@ Global {6544F78F-4EE3-489E-87B7-5FCA9C4D50BD} = {1BD0D669-8E45-4E7C-A20F-707A1887E8ED} {DA182D5C-79F4-4AF6-BF15-6E4496353A6A} = {F9CAC1F5-436E-4406-BACC-FC18C8FE36C5} {2AC0E216-9ED4-4D1D-A264-73905A37F6AA} = {7F83D352-1039-4B8F-B63C-56231421056A} + {428B3874-8780-4939-AC5A-79C3980E01C7} = {7FF55CC5-9069-49E5-B16E-6437AE4892A7} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution - SolutionGuid = {D06EC8A2-02FC-48D4-BC6B-D86A26FED0CC} EnterpriseLibraryConfigurationToolBinariesPathV6 = packages\EnterpriseLibrary.TransientFaultHandling.6.0.1304.0\lib\portable-net45+win+wp8;packages\EnterpriseLibrary.TransientFaultHandling.Data.6.0.1304.1\lib\NET45 + SolutionGuid = {D06EC8A2-02FC-48D4-BC6B-D86A26FED0CC} EndGlobalSection EndGlobal diff --git a/Shared/Microsoft.DataTransfer.TestsCommon/Mocks/DataTransferContextMock.cs b/Shared/Microsoft.DataTransfer.TestsCommon/Mocks/DataTransferContextMock.cs index e31d2d7..6b88e18 100644 --- a/Shared/Microsoft.DataTransfer.TestsCommon/Mocks/DataTransferContextMock.cs +++ b/Shared/Microsoft.DataTransfer.TestsCommon/Mocks/DataTransferContextMock.cs @@ -15,5 +15,15 @@ public string SinkName { get { return "TestSink"; } } + + public string RunConfigSignature + { + get { return "TestRunConfigSignature"; } + } + + public bool EnableResumeFunction + { + get { return false; } + } } } diff --git a/Wpf/Microsoft.DataTransfer.WpfHost/Steps/InfrastructureSetup/InfrastructureConfiguration.cs b/Wpf/Microsoft.DataTransfer.WpfHost/Steps/InfrastructureSetup/InfrastructureConfiguration.cs index 6d8c01b..8bfcd62 100644 --- a/Wpf/Microsoft.DataTransfer.WpfHost/Steps/InfrastructureSetup/InfrastructureConfiguration.cs +++ b/Wpf/Microsoft.DataTransfer.WpfHost/Steps/InfrastructureSetup/InfrastructureConfiguration.cs @@ -36,6 +36,8 @@ public TimeSpan? ProgressUpdateInterval set { SetProperty(ref progressUpdateInterval, value); } } + public bool EnableResumption => false; + public bool EnableCosmosTableLog { get { return true; }