diff --git a/apis/Google.Cloud.Logging.NLog/Google.Cloud.Logging.NLog.Tests/GoogleStackdriverTargetTest.cs b/apis/Google.Cloud.Logging.NLog/Google.Cloud.Logging.NLog.Tests/GoogleStackdriverTargetTest.cs index 4c5cf516b702..9e4264360d2e 100644 --- a/apis/Google.Cloud.Logging.NLog/Google.Cloud.Logging.NLog.Tests/GoogleStackdriverTargetTest.cs +++ b/apis/Google.Cloud.Logging.NLog/Google.Cloud.Logging.NLog.Tests/GoogleStackdriverTargetTest.cs @@ -770,6 +770,37 @@ public async Task GaePlatform_NoConfiguredProjectId() Assert.Equal("gae_project_id", uploadedEntries[0].LogNameAsLogName.ProjectId); } + [Fact] + public async Task Flush_AwaitsAllTasks() + { + var pendingUpload = new TaskCompletionSource(); + int uploadCount = 0; + + await RunTest( + entries => Interlocked.Increment(ref uploadCount) == 1 + ? Task.FromResult(new WriteLogEntriesResponse()) // Msg 1: Completes immediately + : pendingUpload.Task, // Msg 2: Remains pending + async target => + { + var logger = LogManager.GetLogger("testlogger"); + logger.Info("Msg 1"); + logger.Info("Msg 2"); + + var flushFinished = new TaskCompletionSource(); + target.Flush(_ => flushFinished.SetResult(true)); + + // Verify Flush is still waiting for Msg 2. + await Task.Delay(100); + Assert.False(flushFinished.Task.IsCompleted, "Flush must wait for pending chained uploads."); + + // Complete Msg 2; Flush should now finish. + pendingUpload.SetResult(new WriteLogEntriesResponse()); + await flushFinished.Task.WaitAsync(TimeSpan.FromSeconds(2)); + + Assert.Equal(2, uploadCount); + }); + } + private class ProblemTypeBase { internal const string RegularPropertyValue = "Regular property value"; diff --git a/apis/Google.Cloud.Logging.NLog/Google.Cloud.Logging.NLog/GoogleStackdriverTarget.cs b/apis/Google.Cloud.Logging.NLog/Google.Cloud.Logging.NLog/GoogleStackdriverTarget.cs index ee892702f738..0294e1d5446f 100644 --- a/apis/Google.Cloud.Logging.NLog/Google.Cloud.Logging.NLog/GoogleStackdriverTarget.cs +++ b/apis/Google.Cloud.Logging.NLog/Google.Cloud.Logging.NLog/GoogleStackdriverTarget.cs @@ -64,8 +64,6 @@ public partial class GoogleStackdriverTarget : TargetWithContext private long _pendingTaskCount; private CancellationTokenSource _cancelTokenSource; private Func _jsonConvertFunction; - private readonly Func _writeLogEntriesBegin; - private readonly Action _writeLogEntriesCompleted; /// /// Construct a Google Cloud loggin target. @@ -81,8 +79,6 @@ internal GoogleStackdriverTarget(LoggingServiceV2Client client, Platform platfor _contextProperties = new List(); _client = client; _platform = platform; - _writeLogEntriesBegin = WriteLogEntriesBegin; - _writeLogEntriesCompleted = WriteLogEntriesCompleted; } /// @@ -426,17 +422,7 @@ private void WriteLogEntries(IList logEntries, object continuationList InternalLogger.Info("GoogleStackdriver(Name={0}): Throttle timeout but {1} tasks are still pending", Name, _pendingTaskCount); } } - - if (withinTaskLimit && _prevTask != null) - { - _prevTask = _prevTask.ContinueWith(_writeLogEntriesBegin, logEntries, _cancelTokenSource.Token); - } - else - { - _prevTask = WriteLogEntriesBegin(null, logEntries); - } - - _prevTask = _prevTask.ContinueWith(_writeLogEntriesCompleted, continuationList); + _prevTask = ChainNextWriteAsync(_prevTask); } catch (Exception ex) { @@ -444,11 +430,33 @@ private void WriteLogEntries(IList logEntries, object continuationList InternalLogger.Error(ex, "GoogleStackdriver(Name={0}): Failed to begin writing {1} LogEntries", Name, logEntries.Count); throw; } + + async Task ChainNextWriteAsync(Task previousTask) + { + if (withinTaskLimit && previousTask != null) + { + await previousTask.ConfigureAwait(false); + } + + // Return the completed task to WriteLogEntriesCompleted, which handles the results and any exceptions. + Task writeTask = WriteLogEntriesBegin(logEntries); + try + { + await writeTask.ConfigureAwait(false); + } + catch + { + // Exception is handled in WriteLogEntriesCompleted. + } + finally + { + WriteLogEntriesCompleted(writeTask, continuationList); + } + } } - private async Task WriteLogEntriesBegin(Task _, object state) + private async Task WriteLogEntriesBegin(IList logEntries) { - var logEntries = state as IList; await _client.WriteLogEntriesAsync(_logNameToWrite, _resource, s_emptyLabels, logEntries, _cancelTokenSource.Token).ConfigureAwait(false); }