Skip to content

Commit b3733d5

Browse files
BatchSize -> MaxBatchSize (#668)
* BatchSize -> MaxBatchSize * Add backwards compat support * updates * fix test * Fix one more
1 parent 6dbfb4d commit b3733d5

10 files changed

+102
-49
lines changed

docs/BindingsOverview.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
- [az\_func.GlobalState](#az_funcglobalstate)
1919
- [az\_func.Leases\_\*](#az_funcleases_)
2020
- [Configuration for Trigger Bindings](#configuration-for-trigger-bindings)
21-
- [Sql\_Trigger\_BatchSize](#sql_trigger_batchsize)
21+
- [Sql\_Trigger\_MaxBatchSize](#sql_trigger_maxbatchsize)
2222
- [Sql\_Trigger\_PollingIntervalMs](#sql_trigger_pollingintervalms)
2323
- [Sql\_Trigger\_MaxChangesPerWorker](#sql_trigger_maxchangesperworker)
2424
- [Scaling for Trigger Bindings](#scaling-for-trigger-bindings)
@@ -153,9 +153,9 @@ A row is created for every row in the target table that is modified. These are t
153153
154154
This section goes over some of the configuration values you can use to customize SQL trigger bindings. See [How to Use Azure Function App Settings](https://learn.microsoft.com/azure/azure-functions/functions-how-to-use-azure-function-app-settings) to learn more.
155155
156-
#### Sql_Trigger_BatchSize
156+
#### Sql_Trigger_MaxBatchSize
157157
158-
This controls the number of changes processed at once before being sent to the triggered function.
158+
This controls the maximum number of changes sent to the function during each iteration of the change processing loop.
159159
160160
#### Sql_Trigger_PollingIntervalMs
161161

performance/SqlTriggerPerformance_BatchOverride.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ public class SqlTriggerBindingPerformance_BatchOverride : SqlTriggerBindingPerfo
1414
{
1515

1616
[Params(100, 1000)]
17-
public int BatchSize;
17+
public int MaxBatchSize;
1818

1919
[GlobalSetup]
2020
public void GlobalSetup()
@@ -24,7 +24,7 @@ public void GlobalSetup()
2424
nameof(ProductsTrigger),
2525
SupportedLanguages.CSharp,
2626
environmentVariables: new Dictionary<string, string>() {
27-
{ "Sql_Trigger_BatchSize", this.BatchSize.ToString() }
27+
{ "Sql_Trigger_MaxBatchSize", this.MaxBatchSize.ToString() }
2828
});
2929
}
3030

@@ -35,15 +35,15 @@ public void GlobalSetup()
3535
[Arguments(5)]
3636
public async Task Run(double numBatches)
3737
{
38-
int count = (int)(numBatches * this.BatchSize);
38+
int count = (int)(numBatches * this.MaxBatchSize);
3939
await this.WaitForProductChanges(
4040
1,
4141
count,
4242
SqlChangeOperation.Insert,
4343
() => { this.InsertProducts(1, count); return Task.CompletedTask; },
4444
id => $"Product {id}",
4545
id => id * 100,
46-
this.GetBatchProcessingTimeout(1, count, batchSize: this.BatchSize));
46+
this.GetBatchProcessingTimeout(1, count, maxBatchSize: this.MaxBatchSize));
4747
}
4848
}
4949
}

performance/SqlTriggerPerformance_Overrides.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ public class SqlTriggerPerformance_Overrides : SqlTriggerBindingPerformanceTestB
1616
public int PollingIntervalMs;
1717

1818
[Params(500, 2000)]
19-
public int BatchSize;
19+
public int MaxBatchSize;
2020

2121
[GlobalSetup]
2222
public void GlobalSetup()
@@ -26,7 +26,7 @@ public void GlobalSetup()
2626
nameof(ProductsTrigger),
2727
SupportedLanguages.CSharp,
2828
environmentVariables: new Dictionary<string, string>() {
29-
{ "Sql_Trigger_BatchSize", this.BatchSize.ToString() },
29+
{ "Sql_Trigger_MaxBatchSize", this.MaxBatchSize.ToString() },
3030
{ "Sql_Trigger_PollingIntervalMs", this.PollingIntervalMs.ToString() }
3131
});
3232
}
@@ -37,15 +37,15 @@ public void GlobalSetup()
3737
[Arguments(5)]
3838
public async Task Run(double numBatches)
3939
{
40-
int count = (int)(numBatches * this.BatchSize);
40+
int count = (int)(numBatches * this.MaxBatchSize);
4141
await this.WaitForProductChanges(
4242
1,
4343
count,
4444
SqlChangeOperation.Insert,
4545
() => { this.InsertProducts(1, count); return Task.CompletedTask; },
4646
id => $"Product {id}",
4747
id => id * 100,
48-
this.GetBatchProcessingTimeout(1, count, batchSize: this.BatchSize));
48+
this.GetBatchProcessingTimeout(1, count, maxBatchSize: this.MaxBatchSize));
4949
}
5050
}
5151
}

performance/SqlTriggerPerformance_PollingIntervalOverride.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ public void GlobalSetup()
3030
[Benchmark]
3131
public async Task Run()
3232
{
33-
int count = SqlTableChangeMonitor<object>.DefaultBatchSize * 2;
33+
int count = SqlTableChangeMonitor<object>.DefaultMaxBatchSize * 2;
3434
await this.WaitForProductChanges(
3535
1,
3636
count,

src/Telemetry/Telemetry.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -350,7 +350,7 @@ public enum TelemetryPropertyName
350350
ErrorCode,
351351
ErrorName,
352352
HasIdentityColumn,
353-
HasConfiguredBatchSize,
353+
HasConfiguredMaxBatchSize,
354354
HasConfiguredMaxChangesPerWorker,
355355
HasConfiguredPollingInterval,
356356
LeasesTableName,
@@ -383,6 +383,7 @@ public enum TelemetryMeasureName
383383
GetPrimaryKeysDurationMs,
384384
GetUnprocessedChangesDurationMs,
385385
InsertGlobalStateTableRowDurationMs,
386+
MaxBatchSize,
386387
MaxChangesPerWorker,
387388
NumRows,
388389
PollingIntervalMs,

src/TriggerBinding/SqlTableChangeMonitor.cs

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ internal sealed class SqlTableChangeMonitor<T> : IDisposable
4545
private const int LeaseRenewalIntervalInSeconds = 15;
4646
private const int MaxRetryReleaseLeases = 3;
4747

48-
public const int DefaultBatchSize = 100;
48+
public const int DefaultMaxBatchSize = 100;
4949
public const int DefaultPollingIntervalMs = 1000;
5050
#endregion Constants
5151

@@ -60,9 +60,9 @@ internal sealed class SqlTableChangeMonitor<T> : IDisposable
6060
private readonly ITriggeredFunctionExecutor _executor;
6161
private readonly ILogger _logger;
6262
/// <summary>
63-
/// Number of changes to process in each iteration of the loop
63+
/// Maximum number of changes to process in each iteration of the loop
6464
/// </summary>
65-
private readonly int _batchSize = DefaultBatchSize;
65+
private readonly int _maxBatchSize = DefaultMaxBatchSize;
6666
/// <summary>
6767
/// Delay in ms between processing each batch of changes
6868
/// </summary>
@@ -131,13 +131,18 @@ public SqlTableChangeMonitor(
131131
this._userTableId = userTableId;
132132
this._telemetryProps = telemetryProps ?? new Dictionary<TelemetryPropertyName, string>();
133133

134-
// Check if there's config settings to override the default batch size/polling interval values
135-
int? configuredBatchSize = configuration.GetValue<int?>(ConfigKey_SqlTrigger_BatchSize);
134+
// Check if there's config settings to override the default max batch size/polling interval values
135+
int? configuredMaxBatchSize = configuration.GetValue<int?>(ConfigKey_SqlTrigger_MaxBatchSize);
136+
// Fall back to original value for backwards compat if the new value isn't specified
137+
if (configuredMaxBatchSize == null)
138+
{
139+
configuredMaxBatchSize = configuration.GetValue<int?>(ConfigKey_SqlTrigger_BatchSize);
140+
}
136141
int? configuredPollingInterval = configuration.GetValue<int?>(ConfigKey_SqlTrigger_PollingInterval);
137-
this._batchSize = configuredBatchSize ?? this._batchSize;
138-
if (this._batchSize <= 0)
142+
this._maxBatchSize = configuredMaxBatchSize ?? this._maxBatchSize;
143+
if (this._maxBatchSize <= 0)
139144
{
140-
throw new InvalidOperationException($"Invalid value for configuration setting '{ConfigKey_SqlTrigger_BatchSize}'. Ensure that the value is a positive integer.");
145+
throw new InvalidOperationException($"Invalid value for configuration setting '{ConfigKey_SqlTrigger_MaxBatchSize}'. Ensure that the value is a positive integer.");
141146
}
142147
this._pollingIntervalInMs = configuredPollingInterval ?? this._pollingIntervalInMs;
143148
if (this._pollingIntervalInMs <= 0)
@@ -147,17 +152,17 @@ public SqlTableChangeMonitor(
147152
TelemetryInstance.TrackEvent(
148153
TelemetryEventName.TriggerMonitorStart,
149154
new Dictionary<TelemetryPropertyName, string>(telemetryProps) {
150-
{ TelemetryPropertyName.HasConfiguredBatchSize, (configuredBatchSize != null).ToString() },
155+
{ TelemetryPropertyName.HasConfiguredMaxBatchSize, (configuredMaxBatchSize != null).ToString() },
151156
{ TelemetryPropertyName.HasConfiguredPollingInterval, (configuredPollingInterval != null).ToString() },
152157
},
153158
new Dictionary<TelemetryMeasureName, double>() {
154-
{ TelemetryMeasureName.BatchSize, this._batchSize },
159+
{ TelemetryMeasureName.MaxBatchSize, this._maxBatchSize },
155160
{ TelemetryMeasureName.PollingIntervalMs, this._pollingIntervalInMs }
156161
}
157162
);
158163

159164
// Prep search-conditions that will be used besides WHERE clause to match table rows.
160-
this._rowMatchConditions = Enumerable.Range(0, this._batchSize)
165+
this._rowMatchConditions = Enumerable.Range(0, this._maxBatchSize)
161166
.Select(rowIndex => string.Join(" AND ", this._primaryKeyColumns.Select((col, colIndex) => $"{col.name.AsBracketQuotedString()} = @{rowIndex}_{colIndex}")))
162167
.ToList();
163168

@@ -253,7 +258,7 @@ public async Task<long> GetUnprocessedChangeCountAsync()
253258
/// </summary>
254259
private async Task RunChangeConsumptionLoopAsync()
255260
{
256-
this._logger.LogInformationWithThreadId($"Starting change consumption loop. BatchSize: {this._batchSize} PollingIntervalMs: {this._pollingIntervalInMs}");
261+
this._logger.LogInformationWithThreadId($"Starting change consumption loop. MaxBatchSize: {this._maxBatchSize} PollingIntervalMs: {this._pollingIntervalInMs}");
257262

258263
try
259264
{
@@ -875,7 +880,7 @@ private SqlCommand BuildGetChangesCommand(SqlConnection connection, SqlTransacti
875880
FROM {GlobalStateTableName}
876881
WHERE UserFunctionID = '{this._userFunctionId}' AND UserTableID = {this._userTableId};
877882
878-
SELECT TOP {this._batchSize}
883+
SELECT TOP {this._maxBatchSize}
879884
{selectList},
880885
c.{SysChangeVersionColumnName},
881886
c.SYS_CHANGE_OPERATION,

src/TriggerBinding/SqlTriggerConstants.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,11 @@ internal static class SqlTriggerConstants
2626
LeasesTableLeaseExpirationTimeColumnName
2727
};
2828

29+
/// <summary>
30+
/// Deprecated config value for MaxBatchSize, kept for backwards compat reasons
31+
/// </summary>
2932
public const string ConfigKey_SqlTrigger_BatchSize = "Sql_Trigger_BatchSize";
33+
public const string ConfigKey_SqlTrigger_MaxBatchSize = "Sql_Trigger_MaxBatchSize";
3034
public const string ConfigKey_SqlTrigger_PollingInterval = "Sql_Trigger_PollingIntervalMs";
3135
public const string ConfigKey_SqlTrigger_MaxChangesPerWorker = "Sql_Trigger_MaxChangesPerWorker";
3236

test/Integration/SqlTriggerBindingIntegrationTestBase.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -181,17 +181,17 @@ void OutputHandler(object sender, DataReceivedEventArgs e)
181181

182182
/// <summary>
183183
/// Gets a timeout value to use when processing the given number of changes, based on the
184-
/// default batch size and polling interval.
184+
/// default max batch size and polling interval.
185185
/// </summary>
186186
/// <param name="firstId">The first ID in the batch to process</param>
187187
/// <param name="lastId">The last ID in the batch to process</param>
188-
/// <param name="batchSize">The batch size if different than the default batch size</param>
188+
/// <param name="maxBatchSize">The max batch size if different than the default max batch size</param>
189189
/// <param name="pollingIntervalMs">The polling interval in ms if different than the default polling interval</param>
190190
/// <returns></returns>
191-
public int GetBatchProcessingTimeout(int firstId, int lastId, int batchSize = SqlTableChangeMonitor<object>.DefaultBatchSize, int pollingIntervalMs = SqlTableChangeMonitor<object>.DefaultPollingIntervalMs)
191+
public int GetBatchProcessingTimeout(int firstId, int lastId, int maxBatchSize = SqlTableChangeMonitor<object>.DefaultMaxBatchSize, int pollingIntervalMs = SqlTableChangeMonitor<object>.DefaultPollingIntervalMs)
192192
{
193193
int changesToProcess = lastId - firstId + 1;
194-
int calculatedTimeout = (int)(Math.Ceiling((double)changesToProcess / batchSize // The number of batches to process
194+
int calculatedTimeout = (int)(Math.Ceiling((double)changesToProcess / maxBatchSize // The number of batches to process
195195
/ this.FunctionHostList.Count) // The number of function host processes
196196
* pollingIntervalMs // The length to process each batch
197197
* 2); // Double to add buffer time for processing results & writing log messages

test/Integration/SqlTriggerBindingIntegrationTests.cs

Lines changed: 58 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -73,33 +73,34 @@ await this.WaitForProductChanges(
7373
}
7474

7575
/// <summary>
76-
/// Verifies that manually setting the batch size correctly changes the number of changes processed at once
76+
/// Verifies that manually setting the batch size using the original config var correctly changes the
77+
/// number of changes processed at once.
7778
/// </summary>
7879
[Fact]
7980
public async Task BatchSizeOverrideTriggerTest()
8081
{
8182
// Use enough items to require 4 batches to be processed but then
82-
// set the batch size to the same value so they can all be processed in one
83+
// set the max batch size to the same value so they can all be processed in one
8384
// batch. The test will only wait for ~1 batch worth of time so will timeout
84-
// if the batch size isn't actually changed
85-
const int batchSize = SqlTableChangeMonitor<object>.DefaultBatchSize * 4;
85+
// if the max batch size isn't actually changed
86+
const int maxBatchSize = SqlTableChangeMonitor<object>.DefaultMaxBatchSize * 4;
8687
const int firstId = 1;
87-
const int lastId = batchSize;
88+
const int lastId = maxBatchSize;
8889
this.SetChangeTrackingForTable("Products");
8990
var taskCompletionSource = new TaskCompletionSource<bool>();
9091
DataReceivedEventHandler handler = TestUtils.CreateOutputReceievedHandler(
9192
taskCompletionSource,
92-
@"Starting change consumption loop. BatchSize: (\d*) PollingIntervalMs: \d*",
93-
"BatchSize",
94-
batchSize.ToString());
93+
@"Starting change consumption loop. MaxBatchSize: (\d*) PollingIntervalMs: \d*",
94+
"MaxBatchSize",
95+
maxBatchSize.ToString());
9596
this.StartFunctionHost(
9697
nameof(ProductsTriggerWithValidation),
9798
SupportedLanguages.CSharp,
9899
useTestFolder: true,
99100
customOutputHandler: handler,
100101
environmentVariables: new Dictionary<string, string>() {
101-
{ "TEST_EXPECTED_BATCH_SIZE", batchSize.ToString() },
102-
{ "Sql_Trigger_BatchSize", batchSize.ToString() }
102+
{ "TEST_EXPECTED_MAX_BATCH_SIZE", maxBatchSize.ToString() },
103+
{ "Sql_Trigger_BatchSize", maxBatchSize.ToString() } // Use old BatchSize config
103104
}
104105
);
105106

@@ -110,8 +111,50 @@ await this.WaitForProductChanges(
110111
() => { this.InsertProducts(firstId, lastId); return Task.CompletedTask; },
111112
id => $"Product {id}",
112113
id => id * 100,
113-
this.GetBatchProcessingTimeout(firstId, lastId, batchSize: batchSize));
114-
await taskCompletionSource.Task.TimeoutAfter(TimeSpan.FromSeconds(5000), "Timed out waiting for BatchSize configuration message");
114+
this.GetBatchProcessingTimeout(firstId, lastId, maxBatchSize: maxBatchSize));
115+
await taskCompletionSource.Task.TimeoutAfter(TimeSpan.FromSeconds(5), "Timed out waiting for MaxBatchSize configuration message");
116+
}
117+
118+
/// <summary>
119+
/// Verifies that manually setting the max batch size correctly changes the number of changes processed at once
120+
/// </summary>
121+
[Fact]
122+
public async Task MaxBatchSizeOverrideTriggerTest()
123+
{
124+
// Use enough items to require 4 batches to be processed but then
125+
// set the max batch size to the same value so they can all be processed in one
126+
// batch. The test will only wait for ~1 batch worth of time so will timeout
127+
// if the max batch size isn't actually changed
128+
const int maxBatchSize = SqlTableChangeMonitor<object>.DefaultMaxBatchSize * 4;
129+
const int firstId = 1;
130+
const int lastId = maxBatchSize;
131+
this.SetChangeTrackingForTable("Products");
132+
var taskCompletionSource = new TaskCompletionSource<bool>();
133+
DataReceivedEventHandler handler = TestUtils.CreateOutputReceievedHandler(
134+
taskCompletionSource,
135+
@"Starting change consumption loop. MaxBatchSize: (\d*) PollingIntervalMs: \d*",
136+
"MaxBatchSize",
137+
maxBatchSize.ToString());
138+
this.StartFunctionHost(
139+
nameof(ProductsTriggerWithValidation),
140+
SupportedLanguages.CSharp,
141+
useTestFolder: true,
142+
customOutputHandler: handler,
143+
environmentVariables: new Dictionary<string, string>() {
144+
{ "TEST_EXPECTED_MAX_BATCH_SIZE", maxBatchSize.ToString() },
145+
{ "Sql_Trigger_MaxBatchSize", maxBatchSize.ToString() }
146+
}
147+
);
148+
149+
await this.WaitForProductChanges(
150+
firstId,
151+
lastId,
152+
SqlChangeOperation.Insert,
153+
() => { this.InsertProducts(firstId, lastId); return Task.CompletedTask; },
154+
id => $"Product {id}",
155+
id => id * 100,
156+
this.GetBatchProcessingTimeout(firstId, lastId, maxBatchSize: maxBatchSize));
157+
await taskCompletionSource.Task.TimeoutAfter(TimeSpan.FromSeconds(5), "Timed out waiting for MaxBatchSize configuration message");
115158
}
116159

117160
/// <summary>
@@ -124,13 +167,13 @@ public async Task PollingIntervalOverrideTriggerTest()
124167
// Use enough items to require 5 batches to be processed - the test will
125168
// only wait for the expected time and timeout if the default polling
126169
// interval isn't actually modified.
127-
const int lastId = SqlTableChangeMonitor<object>.DefaultBatchSize * 5;
170+
const int lastId = SqlTableChangeMonitor<object>.DefaultMaxBatchSize * 5;
128171
const int pollingIntervalMs = SqlTableChangeMonitor<object>.DefaultPollingIntervalMs / 2;
129172
this.SetChangeTrackingForTable("Products");
130173
var taskCompletionSource = new TaskCompletionSource<bool>();
131174
DataReceivedEventHandler handler = TestUtils.CreateOutputReceievedHandler(
132175
taskCompletionSource,
133-
@"Starting change consumption loop. BatchSize: \d* PollingIntervalMs: (\d*)",
176+
@"Starting change consumption loop. MaxBatchSize: \d* PollingIntervalMs: (\d*)",
134177
"PollingInterval",
135178
pollingIntervalMs.ToString());
136179
this.StartFunctionHost(
@@ -151,7 +194,7 @@ await this.WaitForProductChanges(
151194
id => $"Product {id}",
152195
id => id * 100,
153196
this.GetBatchProcessingTimeout(firstId, lastId, pollingIntervalMs: pollingIntervalMs));
154-
await taskCompletionSource.Task.TimeoutAfter(TimeSpan.FromSeconds(5000), "Timed out waiting for PollingInterval configuration message");
197+
await taskCompletionSource.Task.TimeoutAfter(TimeSpan.FromSeconds(5), "Timed out waiting for PollingInterval configuration message");
155198
}
156199

157200
/// <summary>

0 commit comments

Comments
 (0)