diff --git a/src/Microsoft.Health.Fhir.SqlServer.UnitTests/Features/Operations/Import/ImportProcessingJobTests.cs b/src/Microsoft.Health.Fhir.SqlServer.UnitTests/Features/Operations/Import/ImportProcessingJobTests.cs index b569dd4b21..0796a1c5fa 100644 --- a/src/Microsoft.Health.Fhir.SqlServer.UnitTests/Features/Operations/Import/ImportProcessingJobTests.cs +++ b/src/Microsoft.Health.Fhir.SqlServer.UnitTests/Features/Operations/Import/ImportProcessingJobTests.cs @@ -399,6 +399,73 @@ private ImportProcessingJobDefinition GetInputData() return inputData; } + [Fact] + public async Task GivenImportInput_WhenConstraintViolationOccurs_ThenImportShouldAbort() + { + ImportProcessingJobDefinition inputData = GetInputData(); + + IImportResourceLoader loader = Substitute.For(); + IImporter importer = Substitute.For(); + IImportErrorStore importErrorStore = Substitute.For(); + IImportErrorStoreFactory importErrorStoreFactory = Substitute.For(); + RequestContextAccessor contextAccessor = Substitute.For>(); + ILoggerFactory loggerFactory = new NullLoggerFactory(); + IMediator mediator = Substitute.For(); + IAuditLogger auditLogger = Substitute.For(); + IQueueClient queueClient = Substitute.For(); + + loader.LoadResources(Arg.Any(), Arg.Any(), Arg.Any(), Arg.Any(), Arg.Any(), Arg.Any()) + .Returns(callInfo => + { + Channel resourceChannel = Channel.CreateUnbounded(); + + Task loadTask = Task.Run(async () => + { + try + { + // Simulate loading a resource + var wrapper = new ResourceWrapper("id", "1", "Patient", null, null, null, null, null); + var resource = new ImportResource(0, 0, 100, false, false, false, wrapper); + await resourceChannel.Writer.WriteAsync(resource); + } + finally + { + resourceChannel.Writer.Complete(); + } + }); + + return (resourceChannel, loadTask); + }); + + // Simulate constraint violation by throwing InvalidOperationException with constraint violation message + importer.Import(Arg.Any>(), Arg.Any(), Arg.Any(), Arg.Any(), Arg.Any(), Arg.Any()) + .Returns(callInfo => + { + throw new InvalidOperationException("Import aborted due to constraint violation. See error logs for details."); + }); + + importErrorStoreFactory.InitializeAsync(Arg.Any(), Arg.Any()) + .Returns(importErrorStore); + + ImportProcessingJob job = new ImportProcessingJob( + mediator, + queueClient, + loader, + importer, + importErrorStoreFactory, + contextAccessor, + loggerFactory, + auditLogger); + + var jobInfo = GetJobInfo(inputData, null); + jobInfo.Id = 1; + jobInfo.GroupId = 1; + jobInfo.Status = JobStatus.Running; + + // Verify that import aborts with constraint violation + await Assert.ThrowsAsync(() => job.ExecuteAsync(jobInfo, CancellationToken.None)); + } + private static JobInfo GetJobInfo(ImportProcessingJobDefinition data, ImportProcessingJobResult result) { var jobInfo = new JobInfo diff --git a/src/Microsoft.Health.Fhir.SqlServer/Features/Operations/Import/SqlImporter.cs b/src/Microsoft.Health.Fhir.SqlServer/Features/Operations/Import/SqlImporter.cs index 71f709a8b2..ffd6f882c1 100644 --- a/src/Microsoft.Health.Fhir.SqlServer/Features/Operations/Import/SqlImporter.cs +++ b/src/Microsoft.Health.Fhir.SqlServer/Features/Operations/Import/SqlImporter.cs @@ -90,6 +90,16 @@ public async Task Import(Channel input var validResources = resources.Where(r => string.IsNullOrEmpty(r.ImportError)).ToList(); var newErrors = await _store.ImportResourcesAsync(validResources, importMode, allowNegativeVersions, eventualConsistency, cancellationToken); errors.AddRange(newErrors); + + // Abort import on first constraint violation to avoid wasting compute resources + // Constraint violations are identified by checking if the error message contains constraint-related keywords + if (newErrors.Any(e => e.Contains("constraint violation", StringComparison.OrdinalIgnoreCase) || + e.Contains("Database constraint", StringComparison.OrdinalIgnoreCase))) + { + _logger.LogWarning("Constraint violation detected during import. Aborting further processing to conserve resources."); + throw new InvalidOperationException("Import aborted due to constraint violation. See error logs for details."); + } + var totalBytes = resources.Sum(_ => (long)_.Length); resources.Clear(); return (validResources.Count - newErrors.Count, totalBytes); diff --git a/src/Microsoft.Health.Fhir.SqlServer/Features/Storage/SqlServerFhirDataStore.cs b/src/Microsoft.Health.Fhir.SqlServer/Features/Storage/SqlServerFhirDataStore.cs index 4e872a0972..aa6f38b518 100644 --- a/src/Microsoft.Health.Fhir.SqlServer/Features/Storage/SqlServerFhirDataStore.cs +++ b/src/Microsoft.Health.Fhir.SqlServer/Features/Storage/SqlServerFhirDataStore.cs @@ -11,6 +11,7 @@ using System.Linq; using System.Security.Cryptography; using System.Text; +using System.Text.RegularExpressions; using System.Threading; using System.Threading.Tasks; using EnsureThat; @@ -409,6 +410,79 @@ private async Task + /// Extracts constraint violation details from SQL exception and creates error message with resource context. + /// This approach avoids duplicating constraint validation logic between C# and SQL. + /// + /// The SQL exception containing constraint violation details + /// The list of resources being imported when the constraint violation occurred + /// A formatted error message string with constraint details and resource context + private string ExtractConstraintViolationDetails(SqlException sqlException, IReadOnlyList resources) + { + // SQL Server constraint violation error message typically contains: + // - The constraint name + // - The table name + // - The conflicting key value (if available) + string errorMessage = sqlException.Message; + + // Extract relevant constraint information from error message + // Common patterns in SQL constraint violation messages: + // "The INSERT statement conflicted with the FOREIGN KEY constraint..." + // "The INSERT statement conflicted with the CHECK constraint..." + // "Violation of PRIMARY KEY constraint..." + + string constraintType = "unknown"; + string constraintName = "unknown"; + + // Try to extract constraint name using common patterns (both single and double quotes) + var constraintNamePattern = @"constraint [""']([^""']+)[""']"; + + if (errorMessage.Contains("CHECK constraint", StringComparison.OrdinalIgnoreCase)) + { + constraintType = "CHECK constraint"; + var match = Regex.Match(errorMessage, constraintNamePattern, RegexOptions.IgnoreCase); + if (match.Success) + { + constraintName = match.Groups[1].Value; + } + } + else if (errorMessage.Contains("FOREIGN KEY constraint", StringComparison.OrdinalIgnoreCase)) + { + constraintType = "FOREIGN KEY constraint"; + var match = Regex.Match(errorMessage, constraintNamePattern, RegexOptions.IgnoreCase); + if (match.Success) + { + constraintName = match.Groups[1].Value; + } + } + else if (errorMessage.Contains("PRIMARY KEY constraint", StringComparison.OrdinalIgnoreCase)) + { + constraintType = "PRIMARY KEY constraint"; + var match = Regex.Match(errorMessage, constraintNamePattern, RegexOptions.IgnoreCase); + if (match.Success) + { + constraintName = match.Groups[1].Value; + } + } + + // Build informative error message with resource context + var resourceInfo = resources.Count == 1 + ? $"resource at index {resources[0].Index}" + : $"batch of {resources.Count} resources (indices {resources.First().Index} to {resources.Last().Index})"; + + var detailedError = $"Database constraint violation ({constraintType}: {constraintName}) in {resourceInfo}. " + + $"SQL Error: {errorMessage}"; + + _logger.LogError("Constraint violation during import: {ConstraintType} {ConstraintName} for {ResourceInfo}", + constraintType, constraintName, resourceInfo); + + // Serialize error for the first resource in the batch (since we're aborting on first error) + return _importErrorSerializer.Serialize( + resources.First().Index, + detailedError, + resources.First().Offset); + } + internal async Task> ImportResourcesAsync(IReadOnlyList resources, ImportMode importMode, bool allowNegativeVersions, bool eventualConsistency, CancellationToken cancellationToken) { if (resources.Count == 0) // do not go to the database @@ -429,6 +503,19 @@ internal async Task> ImportResourcesAsync(IReadOnlyList { constraintError }; + } + if (sqlEx != null && sqlEx.Number == SqlErrorCodes.Conflict && retries++ < maxRetries) { _logger.LogWarning(e, $"Error on {nameof(ImportResourcesInternalAsync)} retries={{Retries}} resources={{Resources}}", retries, resources.Count); @@ -473,23 +560,9 @@ List GetErrors(IReadOnlyCollection dups, IReadOnlyCollec var loaded = new List(); var conflicts = new List(); - // Check for constraint violations - var constraintCheckresults = ValidateConstraintViolations(resources); - - // Check if there are any conflicts while constraint checks - if (constraintCheckresults.Conflicts.Any()) - { - conflicts.AddRange(constraintCheckresults.Conflicts); - } - - // 2. Subsequent processing should only operate on the validated resources. - if (!constraintCheckresults.ValidResources.Any()) - { - return (loaded, conflicts); // All resources had conflicts, so we can stop here. - } - - // 3. Shadow the 'resources' variable to ensure the rest of the method uses the validated list. - resources = constraintCheckresults.ValidResources; + // Note: Constraint validation is now performed by SQL database. + // If a constraint violation occurs, it will be caught as a SqlException + // with error code 547, and we will handle it in the catch block above. if (importMode == ImportMode.InitialLoad) { @@ -726,168 +799,6 @@ async Task MergeUnversioned(List inputs, bool keepLastUpdated, b loaded.AddRange(inputNoConflict); } - // Validate if there are any constraint violations - (List ValidResources, List Conflicts) ValidateConstraintViolations(IReadOnlyList resources) - { - var validResources = new List(); - var conflicts = new List(); - - var tokenSearchParamRowGenerator = new TokenSearchParamListRowGenerator(_model, _searchParameterTypeMap); - var tokenTokenCompositeSearchParamRowGenerator = new TokenTokenCompositeSearchParamListRowGenerator(_model, tokenSearchParamRowGenerator, _searchParameterTypeMap); - var tokenStringCompositeSearchParamListRowGenerator = new TokenStringCompositeSearchParamListRowGenerator(_model, tokenSearchParamRowGenerator, new StringSearchParamListRowGenerator(_model, _searchParameterTypeMap), _searchParameterTypeMap); - var tokenQuantityCompositeSearchParamListRowGenerator = new TokenQuantityCompositeSearchParamListRowGenerator(_model, tokenSearchParamRowGenerator, new QuantitySearchParamListRowGenerator(_model, _searchParameterTypeMap), _searchParameterTypeMap); - var tokenNumberNumberCompositeSearchParamListRowGenerator = new TokenNumberNumberCompositeSearchParamListRowGenerator(_model, tokenSearchParamRowGenerator, new NumberSearchParamListRowGenerator(_model, _searchParameterTypeMap), _searchParameterTypeMap); - var tokenDateTimeCompositeSearchParamListRowGenerator = new TokenDateTimeCompositeSearchParamListRowGenerator(_model, tokenSearchParamRowGenerator, new DateTimeSearchParamListRowGenerator(_model, _searchParameterTypeMap), _searchParameterTypeMap); - var referenceTokenCompositeSearchParamListRowGenerator = new ReferenceTokenCompositeSearchParamListRowGenerator(_model, new ReferenceSearchParamListRowGenerator(_model, _searchParameterTypeMap), tokenSearchParamRowGenerator, _searchParameterTypeMap); - - // Traverse through the resources to check which resources are causing constraint violation - foreach (var resource in resources) - { - bool hasConflict = false; - - var wrapper = new MergeResourceWrapper(resource.ResourceWrapper, false, false); - - try - { - // TokenSearchParam constraint validation - foreach (var row in tokenSearchParamRowGenerator.GenerateRows(new[] { wrapper })) - { - if (row.Code != null && row.CodeOverflow != null && - Encoding.UTF8.GetByteCount(row.Code) < VLatest.TokenSearchParam.Code.Metadata.MaxLength) - { - resource.ImportError = Resources.TokenSearchParamCodeOverflow; - conflicts.Add(resource); - hasConflict = true; - break; - } - } - - if (hasConflict) - { - continue; // If conflict then move to next resource - } - - // Token-Token Composite search param constraint validation - foreach (var row in tokenTokenCompositeSearchParamRowGenerator.GenerateRows(new[] { wrapper })) - { - // Check if constraint for CodeOverflow1 or CodeOverflow2 is violating - if ((row.CodeOverflow1 != null && Encoding.UTF8.GetByteCount(row.Code1 ?? string.Empty) < VLatest.TokenTokenCompositeSearchParam.Code1.Metadata.MaxLength) || - (row.CodeOverflow2 != null && Encoding.UTF8.GetByteCount(row.Code2 ?? string.Empty) < VLatest.TokenTokenCompositeSearchParam.Code2.Metadata.MaxLength)) - { - resource.ImportError = Resources.TokenTokenCompositeSearchParamCodeOverflow; - conflicts.Add(resource); - hasConflict = true; - break; - } - } - - if (hasConflict) - { - continue; // If conflict then move to next resource - } - - // Token-String Composite search param constraint validation - foreach (var row in tokenStringCompositeSearchParamListRowGenerator.GenerateRows(new[] { wrapper })) - { - if (row.Code1 != null && row.CodeOverflow1 != null && - Encoding.UTF8.GetByteCount(row.Code1) < VLatest.TokenStringCompositeSearchParam.Code1.Metadata.MaxLength) - { - resource.ImportError = Resources.TokenStringCompositeSearchParamCodeOverflow1; - conflicts.Add(resource); - hasConflict = true; - break; - } - } - - if (hasConflict) - { - continue; // If conflict then move to next resource - } - - // Token-Quantity Composite search param constraint validation - foreach (var row in tokenQuantityCompositeSearchParamListRowGenerator.GenerateRows(new[] { wrapper })) - { - if (row.Code1 != null && row.CodeOverflow1 != null && - Encoding.UTF8.GetByteCount(row.Code1) < VLatest.TokenQuantityCompositeSearchParam.Code1.Metadata.MaxLength) - { - resource.ImportError = Resources.TokenQuantityCompositeSearchParamCodeOverflow1; - conflicts.Add(resource); - hasConflict = true; - break; - } - } - - if (hasConflict) - { - continue; // If conflict then move to next resource - } - - // Token-NumberNumber Composite search param constraint validation - foreach (var row in tokenNumberNumberCompositeSearchParamListRowGenerator.GenerateRows(new[] { wrapper })) - { - if (row.Code1 != null && row.CodeOverflow1 != null && - Encoding.UTF8.GetByteCount(row.Code1) < VLatest.TokenNumberNumberCompositeSearchParam.Code1.Metadata.MaxLength) - { - resource.ImportError = Resources.TokenNumberNumberCompositeSearchParamCodeOverflow1; - conflicts.Add(resource); - hasConflict = true; - break; - } - } - - if (hasConflict) - { - continue; // If conflict then move to next resource - } - - // Token-DateTime Composite search param constraint validation - foreach (var row in tokenDateTimeCompositeSearchParamListRowGenerator.GenerateRows(new[] { wrapper })) - { - if (row.Code1 != null && row.CodeOverflow1 != null && - Encoding.UTF8.GetByteCount(row.Code1) < VLatest.TokenDateTimeCompositeSearchParam.Code1.Metadata.MaxLength) - { - resource.ImportError = Resources.TokenDateTimeCompositeSearchParamCodeOverflow1; - conflicts.Add(resource); - hasConflict = true; - break; - } - } - - if (hasConflict) - { - continue; // If conflict then move to next resource - } - - // Reference-Token Composite search param constraint validation - foreach (var row in referenceTokenCompositeSearchParamListRowGenerator.GenerateRows(new[] { wrapper })) - { - if (row.Code2 != null && row.CodeOverflow2 != null && - Encoding.UTF8.GetByteCount(row.Code2) < VLatest.ReferenceTokenCompositeSearchParam.Code2.Metadata.MaxLength) - { - resource.ImportError = Resources.ReferenceTokenCompositeSearchParamCodeOverflow2; - conflicts.Add(resource); - hasConflict = true; - break; - } - } - } - catch (Exception ex) - { - _logger.LogWarning(ex, string.Format(Resources.ExceptionWhileResourceValidation, resource.Index)); - resource.ImportError = string.Format(Resources.ExceptionWhileResourceValidation, resource.Index); - conflicts.Add(resource); - continue; - } - - if (!hasConflict) - { - validResources.Add(resource); - } - } - - return (validResources, conflicts); - } - async Task Merge(IEnumerable resources, bool keepLastUpdated, bool useReplicasForReads) { var input = resources.Select(_ => new ResourceWrapperOperation(_.ResourceWrapper, true, true, null, requireETagOnUpdate: false, keepVersion: _.KeepVersion, bundleResourceContext: null)).ToList();