Skip to content

Commit 992c4ff

Browse files
authored
Spin worker on exception (#1189)
* check the changes and set worker to 1 if tables are absent. * add comments * get chnages query * log error * check for error number * update exception check * add test * undo test * throw exception * update log message * comments * add test * detailed comments * fix compiler error * BracketQuotedFullName
1 parent fceb5d4 commit 992c4ff

File tree

3 files changed

+183
-4
lines changed

3 files changed

+183
-4
lines changed

src/TriggerBinding/SqlTriggerTargetScaler.cs

Lines changed: 59 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
11
// Copyright (c) Microsoft Corporation. All rights reserved.
22
// Licensed under the MIT License. See License.txt in the project root for license information.
33
using System;
4+
using System.Threading;
45
using System.Threading.Tasks;
56
using Microsoft.Azure.WebJobs.Host.Scale;
7+
using Microsoft.Data.SqlClient;
68
using Microsoft.Extensions.Logging;
9+
using static Microsoft.Azure.WebJobs.Extensions.Sql.SqlTriggerUtils;
710

811
namespace Microsoft.Azure.WebJobs.Extensions.Sql
912
{
@@ -14,24 +17,76 @@ internal sealed class SqlTriggerTargetScaler : ITargetScaler
1417
{
1518
private readonly SqlTriggerMetricsProvider _metricsProvider;
1619
private readonly int _maxChangesPerWorker;
20+
private readonly ILogger _logger;
21+
private readonly string _connectionString;
22+
private readonly SqlObject _userTable;
23+
private static readonly DateTime _firstTableCreationWarmupAttempt = DateTime.MinValue;
24+
1725

1826
public SqlTriggerTargetScaler(string userFunctionId, SqlObject userTable, string userDefinedLeasesTableName, string connectionString, int maxChangesPerWorker, ILogger logger)
1927
{
2028
this._metricsProvider = new SqlTriggerMetricsProvider(connectionString, logger, userTable, userFunctionId, userDefinedLeasesTableName);
2129
this.TargetScalerDescriptor = new TargetScalerDescriptor(userFunctionId);
2230
this._maxChangesPerWorker = maxChangesPerWorker;
31+
this._logger = logger ?? throw new ArgumentNullException(nameof(logger));
32+
this._connectionString = !string.IsNullOrEmpty(connectionString) ? connectionString : throw new ArgumentNullException(nameof(connectionString));
33+
this._userTable = userTable ?? throw new ArgumentNullException(nameof(userTable));
2334
}
2435

2536
public TargetScalerDescriptor TargetScalerDescriptor { get; }
2637

2738
public async Task<TargetScalerResult> GetScaleResultAsync(TargetScalerContext context)
2839
{
29-
SqlTriggerMetrics metrics = await this._metricsProvider.GetMetricsAsync();
40+
try
41+
{
42+
SqlTriggerMetrics metrics = await this._metricsProvider.GetMetricsAsync();
43+
44+
// Instance concurrency value is set by the functions host when dynamic concurrency is enabled. See https://learn.microsoft.com/en-us/azure/azure-functions/functions-concurrency for more details.
45+
int concurrency = context.InstanceConcurrency ?? this._maxChangesPerWorker;
3046

31-
// Instance concurrency value is set by the functions host when dynamic concurrency is enabled. See https://learn.microsoft.com/en-us/azure/azure-functions/functions-concurrency for more details.
32-
int concurrency = context.InstanceConcurrency ?? this._maxChangesPerWorker;
47+
return GetScaleResultInternal(concurrency, metrics.UnprocessedChangeCount);
48+
}
49+
catch (Exception ex)
50+
{
51+
// If the exception is SQL exception and indicates that the object name is invalid, it means that the global state and leases table are not created
52+
// Check for the error number 208 https://learn.microsoft.com/en-us/sql/relational-databases/errors-events/mssqlserver-208-database-engine-error?view=sql-server-ver17
53+
if (ex is SqlException sqlEx && sqlEx.Number == 208)
54+
{
55+
// If it's been 2 minutes since we first spun up the worker and the table still isn't created then stop trying
56+
// since it likely means something else is wrong we can't fix automatically, and we don't want to leave an
57+
// instance running forever.
58+
this._logger.LogWarning("Invalid object name detected. SQL trigger tables not found.");
59+
if (_firstTableCreationWarmupAttempt != DateTime.MinValue && DateTime.UtcNow - _firstTableCreationWarmupAttempt > TimeSpan.FromMinutes(2))
60+
{
61+
this._logger.LogWarning("Returning 0 as the target worker count since the GetMetrics query threw an 'Invalid object name detected' error and we've exceeded the warmup period for scaling up a new instance to create the required state tables.");
62+
return new TargetScalerResult
63+
{
64+
TargetWorkerCount = 0
65+
};
66+
}
67+
else
68+
{
69+
// Check if there are any changes in the user table. Since we don't have a leases table that means
70+
// we haven't processed any of the changes yet so we can just check if there's any changes
71+
// for the table at all (no last sync point)
72+
int changes = await GetChangeCountFromChangeTrackingAsync(this._connectionString, this._userTable, this._logger, CancellationToken.None);
73+
// If there are changes in the user table, we spin up worker(s) to start handling those changes.
74+
// This will also create the global state and leases table, which will allow the scaling logic to start working as intended.
75+
if (changes > 0)
76+
{
77+
this._logger.LogWarning("There are changes in the change-tracking table for the user table, but the global state and leases table are not created. Spinning up worker instances to create those tables and start processing changes.");
78+
return new TargetScalerResult
79+
{
80+
TargetWorkerCount = (int)Math.Ceiling(changes / (decimal)(context.InstanceConcurrency ?? this._maxChangesPerWorker))
81+
};
82+
}
83+
}
3384

34-
return GetScaleResultInternal(concurrency, metrics.UnprocessedChangeCount);
85+
}
86+
// If the exception is not related to the invalid object name Or if there are no changes in the change tracking table for the user table yet.
87+
this._logger.LogError("An error occurred while getting the scale result for SQL trigger. Exception: {ExceptionMessage}", ex.Message);
88+
throw;
89+
}
3590
}
3691

3792
internal static TargetScalerResult GetScaleResultInternal(int concurrency, long unprocessedChangeCount)

src/TriggerBinding/SqlTriggerUtils.cs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,5 +126,35 @@ internal static string GetBracketedLeasesTableName(string userDefinedLeasesTable
126126
return string.IsNullOrEmpty(userDefinedLeasesTableName) ? string.Format(CultureInfo.InvariantCulture, LeasesTableNameFormat, $"{userFunctionId}_{userTableId}") :
127127
string.Format(CultureInfo.InvariantCulture, UserDefinedLeasesTableNameFormat, $"{userDefinedLeasesTableName.AsBracketQuotedString()}");
128128
}
129+
130+
/// <summary>
131+
/// Returns the number of changes in the change tracking table of ID of the user table.
132+
/// </summary>
133+
/// <param name="connectionString">SQL connection string used to connect to user database</param>
134+
/// <param name="userTable">SqlObject user table</param>
135+
/// <param name="logger">Facilitates logging of messages</param>
136+
/// <param name="cancellationToken">Cancellation token to pass to the command</param>
137+
/// <exception cref="InvalidOperationException">Thrown in case of error when querying the object ID for the user table</exception>
138+
internal static async Task<int> GetChangeCountFromChangeTrackingAsync(string connectionString, SqlObject userTable, ILogger logger, CancellationToken cancellationToken)
139+
{
140+
string getChangeCountCommand = $"SELECT COUNT_BIG(*) FROM CHANGETABLE(CHANGES {userTable.BracketQuotedFullName}, null) AS ChTbl;";
141+
using (var connection = new SqlConnection(connectionString))
142+
{
143+
connection.Open();
144+
using (var getChangesCount = new SqlCommand(getChangeCountCommand, connection))
145+
{
146+
object result = await getChangesCount.ExecuteScalarAsyncWithLogging(logger, cancellationToken, true);
147+
if (result is DBNull)
148+
{
149+
logger.LogError($"GetNumberOfChangesAsync: Could not find table: '{userTable.FullName}' or no changes found.");
150+
return 0;
151+
}
152+
int changeCount = Convert.ToInt32(result, CultureInfo.InvariantCulture);
153+
logger.LogDebug($"GetNumberOfChanges ChangeCount={changeCount}");
154+
connection.Close();
155+
return changeCount;
156+
}
157+
}
158+
}
129159
}
130160
}

test/Integration/SqlTriggerBindingIntegrationTests.cs

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -697,6 +697,100 @@ public async Task ScaleHostEndToEndTest()
697697

698698
}
699699

700+
/// <summary>
701+
/// Tests that the Scale Controller is able to scale out the workers on Flex Consumption.
702+
/// </summary>
703+
[Fact]
704+
public async Task FlexScaleHostEndToEndTest()
705+
{
706+
string TestFunctionName = "TestFunction";
707+
string ConnectionStringName = "SqlConnectionString";
708+
IConfiguration configuration = new ConfigurationBuilder().Build();
709+
710+
string hostJson =
711+
/*lang = json */
712+
@"{
713+
""azureWebJobs"" : {
714+
""extensions"": {
715+
""sql"": {
716+
""MaxChangesPerWorker"" : 10
717+
}
718+
}
719+
}
720+
}";
721+
string sqlTriggerJson = $@"{{
722+
""name"": ""{TestFunctionName}"",
723+
""type"": ""sqlTrigger"",
724+
""tableName"": ""[dbo].[Products]"",
725+
""connectionStringSetting"": ""{ConnectionStringName}"",
726+
""userFunctionId"" : ""testFunctionId""
727+
}}";
728+
var triggerMetadata = new TriggerMetadata(JObject.Parse(sqlTriggerJson));
729+
730+
this.SetChangeTrackingForTable("Products");
731+
732+
IHost host = new HostBuilder().ConfigureServices(services => services.AddAzureClientsCore()).Build();
733+
AzureComponentFactory defaultAzureComponentFactory = host.Services.GetService<AzureComponentFactory>();
734+
735+
string hostId = "test-host";
736+
var loggerProvider = new TestLoggerProvider();
737+
738+
IHostBuilder hostBuilder = new HostBuilder();
739+
hostBuilder.ConfigureLogging(configure =>
740+
{
741+
configure.SetMinimumLevel(LogLevel.Debug);
742+
configure.AddProvider(loggerProvider);
743+
});
744+
hostBuilder.ConfigureAppConfiguration((hostBuilderContext, config) =>
745+
{
746+
// Adding host.json here
747+
config.AddJsonStream(new MemoryStream(Encoding.UTF8.GetBytes(hostJson)));
748+
749+
var settings = new Dictionary<string, string>()
750+
{
751+
{ ConnectionStringName, this.DbConnectionString },
752+
{ "Microsoft.Azure.WebJobs.Extensions.Sql", "1" }
753+
};
754+
755+
// Adding app setting
756+
config.AddInMemoryCollection(settings);
757+
})
758+
.ConfigureServices(services =>
759+
{
760+
services.AddAzureClientsCore();
761+
services.AddAzureStorageScaleServices();
762+
})
763+
.ConfigureWebJobsScale((context, builder) =>
764+
{
765+
builder.AddSql();
766+
builder.UseHostId(hostId);
767+
builder.AddSqlScaleForTrigger(triggerMetadata);
768+
},
769+
scaleOptions =>
770+
{
771+
scaleOptions.IsTargetScalingEnabled = true;
772+
scaleOptions.MetricsPurgeEnabled = false;
773+
scaleOptions.ScaleMetricsMaxAge = TimeSpan.FromMinutes(4);
774+
scaleOptions.IsRuntimeScalingEnabled = true;
775+
scaleOptions.ScaleMetricsSampleInterval = TimeSpan.FromSeconds(1);
776+
});
777+
778+
IHost scaleHost = hostBuilder.Build();
779+
await scaleHost.StartAsync();
780+
781+
int firstId = 1;
782+
int lastId = 30;
783+
this.InsertProducts(firstId, lastId);
784+
785+
IScaleStatusProvider scaleManager = scaleHost.Services.GetService<IScaleStatusProvider>();
786+
AggregateScaleStatus scaleStatus = await scaleManager.GetScaleStatusAsync(new ScaleStatusContext());
787+
788+
Assert.Equal(ScaleVote.ScaleOut, scaleStatus.Vote);
789+
Assert.Equal(3, scaleStatus.TargetWorkerCount);
790+
791+
await scaleHost.StopAsync();
792+
}
793+
700794
/// <summary>
701795
/// Tests that when using an unsupported database the expected error is thrown
702796
/// </summary>

0 commit comments

Comments
 (0)