Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -770,6 +770,37 @@ public async Task GaePlatform_NoConfiguredProjectId()
Assert.Equal("gae_project_id", uploadedEntries[0].LogNameAsLogName.ProjectId);
}

[Fact]
public async Task Flush_AwaitsAllTasks()
{
var pendingUpload = new TaskCompletionSource<WriteLogEntriesResponse>();
int uploadCount = 0;

await RunTest(
entries => Interlocked.Increment(ref uploadCount) == 1
? Task.FromResult(new WriteLogEntriesResponse()) // Msg 1: Completes immediately
: pendingUpload.Task, // Msg 2: Remains pending
async target =>
{
var logger = LogManager.GetLogger("testlogger");
logger.Info("Msg 1");
logger.Info("Msg 2");

var flushFinished = new TaskCompletionSource<bool>();
target.Flush(_ => flushFinished.SetResult(true));

// Verify Flush is still waiting for Msg 2.
await Task.Delay(100);
Assert.False(flushFinished.Task.IsCompleted, "Flush must wait for pending chained uploads.");

// Complete Msg 2; Flush should now finish.
pendingUpload.SetResult(new WriteLogEntriesResponse());
await flushFinished.Task.WaitAsync(TimeSpan.FromSeconds(2));

Assert.Equal(2, uploadCount);
});
}

private class ProblemTypeBase
{
internal const string RegularPropertyValue = "Regular property value";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,6 @@ public partial class GoogleStackdriverTarget : TargetWithContext
private long _pendingTaskCount;
private CancellationTokenSource _cancelTokenSource;
private Func<object, Value> _jsonConvertFunction;
private readonly Func<Task, object, Task> _writeLogEntriesBegin;
private readonly Action<Task, object> _writeLogEntriesCompleted;
Comment thread
robertvoinescu-work marked this conversation as resolved.

/// <summary>
/// Construct a Google Cloud loggin target.
Expand All @@ -81,8 +79,6 @@ internal GoogleStackdriverTarget(LoggingServiceV2Client client, Platform platfor
_contextProperties = new List<TargetPropertyWithContext>();
_client = client;
_platform = platform;
_writeLogEntriesBegin = WriteLogEntriesBegin;
_writeLogEntriesCompleted = WriteLogEntriesCompleted;
}

/// <summary>
Expand Down Expand Up @@ -426,29 +422,41 @@ private void WriteLogEntries(IList<LogEntry> logEntries, object continuationList
InternalLogger.Info("GoogleStackdriver(Name={0}): Throttle timeout but {1} tasks are still pending", Name, _pendingTaskCount);
}
}

if (withinTaskLimit && _prevTask != null)
{
_prevTask = _prevTask.ContinueWith(_writeLogEntriesBegin, logEntries, _cancelTokenSource.Token);
}
else
{
_prevTask = WriteLogEntriesBegin(null, logEntries);
}

_prevTask = _prevTask.ContinueWith(_writeLogEntriesCompleted, continuationList);
_prevTask = ChainNextWriteAsync(_prevTask);
}
catch (Exception ex)
{
Interlocked.Decrement(ref _pendingTaskCount);
InternalLogger.Error(ex, "GoogleStackdriver(Name={0}): Failed to begin writing {1} LogEntries", Name, logEntries.Count);
throw;
}

async Task ChainNextWriteAsync(Task previousTask)
{
if (withinTaskLimit && previousTask != null)
{
await previousTask.ConfigureAwait(false);
}

// Return the completed task to WriteLogEntriesCompleted, which handles the results and any exceptions.
Task writeTask = WriteLogEntriesBegin(logEntries);
try
{
await writeTask.ConfigureAwait(false);
}
catch
{
// Exception is handled in WriteLogEntriesCompleted.
}
finally
{
WriteLogEntriesCompleted(writeTask, continuationList);
}
}
}

private async Task WriteLogEntriesBegin(Task _, object state)
private async Task WriteLogEntriesBegin(IList<LogEntry> logEntries)
{
var logEntries = state as IList<LogEntry>;
await _client.WriteLogEntriesAsync(_logNameToWrite, _resource, s_emptyLabels, logEntries, _cancelTokenSource.Token).ConfigureAwait(false);
}

Expand Down
Loading