Skip to content

Commit 56690bc

Browse files
SergeyRyabininsbiscigl
authored andcommitted
Fix CRT HTTP Client transcribe streaming
1 parent 6bbfc3a commit 56690bc

File tree

1 file changed

+34
-12
lines changed

1 file changed

+34
-12
lines changed

src/aws-cpp-sdk-core/source/http/crt/CRTHttpClient.cpp

Lines changed: 34 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,21 @@ static const char *const CRT_HTTP_CLIENT_TAG = "CRTHttpClient";
2020
// Adapts AWS SDK input streams and rate limiters to the CRT input stream reading model.
2121
class SDKAdaptingInputStream : public Aws::Crt::Io::StdIOStreamInputStream {
2222
public:
23-
SDKAdaptingInputStream(const std::shared_ptr<Aws::Utils::RateLimits::RateLimiterInterface>& rateLimiter, std::shared_ptr<Aws::Crt::Io::IStream> stream,
24-
const Aws::Http::HttpClient& client, const Aws::Http::HttpRequest& request, bool isStreaming,
25-
Aws::Crt::Allocator *allocator = Aws::Crt::ApiAllocator()) noexcept :
26-
Aws::Crt::Io::StdIOStreamInputStream(std::move(stream), allocator), m_rateLimiter(rateLimiter),
27-
m_client(client), m_currentRequest(request), m_isStreaming(isStreaming), m_chunkEnd(false)
23+
SDKAdaptingInputStream(const std::shared_ptr<Aws::Utils::RateLimits::RateLimiterInterface>& rateLimiter,
24+
std::shared_ptr<Aws::Crt::Io::IStream> stream,
25+
const Aws::Http::HttpClient& client,
26+
const Aws::Http::HttpRequest& request,
27+
bool isStreaming,
28+
Aws::Crt::Allocator* allocator = Aws::Crt::ApiAllocator()) noexcept :
29+
Aws::Crt::Io::StdIOStreamInputStream(std::move(stream), allocator),
30+
m_rateLimiter(rateLimiter),
31+
m_client(client),
32+
m_currentRequest(request),
33+
m_isStreaming(isStreaming),
34+
m_chunkEnd(false)
2835
{
2936
}
37+
3038
protected:
3139

3240
bool ReadImpl(Aws::Crt::ByteBuf &buffer) noexcept override
@@ -56,7 +64,18 @@ class SDKAdaptingInputStream : public Aws::Crt::Io::StdIOStreamInputStream {
5664

5765
// now do the read. We may over read by an IO buffer size, but it's fine. The throttle will still
5866
// kick-in in plenty of time.
59-
bool retValue = Aws::Crt::Io::StdIOStreamInputStream::ReadImpl(buffer);
67+
bool retValue = false;
68+
if (!m_isStreaming)
69+
{
70+
retValue = Aws::Crt::Io::StdIOStreamInputStream::ReadImpl(buffer);
71+
}
72+
else
73+
{
74+
if (StdIOStreamInputStream::GetStatusImpl().is_valid && StdIOStreamInputStream::PeekImpl() != std::char_traits<char>::eof())
75+
{
76+
retValue = Aws::Crt::Io::StdIOStreamInputStream::ReadSomeImpl(buffer);
77+
}
78+
}
6079
size_t newPos = buffer.len;
6180
AWS_ASSERT(newPos >= currentPos && "the buffer length should not have decreased in value.");
6281

@@ -161,10 +180,10 @@ class AsyncWaiter
161180
m_cvar.wait(uniqueLocker, [this](){return m_wakeupIntentional;});
162181
}
163182

164-
bool WaitOnCompletionUntil(std::chrono::time_point<std::chrono::high_resolution_clock> until)
183+
bool WaitOnCompletionFor(const size_t ms)
165184
{
166185
std::unique_lock<std::mutex> uniqueLocker(m_lock);
167-
return m_cvar.wait_until(uniqueLocker, until, [this](){return m_wakeupIntentional;});
186+
return m_cvar.wait_for(uniqueLocker, std::chrono::milliseconds(ms), [this](){return m_wakeupIntentional;});
168187
}
169188

170189
private:
@@ -431,9 +450,14 @@ namespace Aws
431450

432451
// This will arrive at or around the same time as the headers. Use it to set the response code on the response
433452
requestOptions.onIncomingHeadersBlockDone =
434-
[response](Crt::Http::HttpStream& stream, enum aws_http_header_block block)
453+
[request, response](Crt::Http::HttpStream& stream, enum aws_http_header_block block)
435454
{
436455
OnIncomingHeadersBlockDone(stream, block, response);
456+
auto& headersHandler = request->GetHeadersReceivedEventHandler();
457+
if (headersHandler)
458+
{
459+
headersHandler(request.get(), response.get());
460+
}
437461
};
438462

439463
// CRT client is async only so we'll need to do the synchronous part ourselves.
@@ -467,9 +491,7 @@ namespace Aws
467491
// all that effort if that's the worst thing that can happen?
468492
if (m_configuration.requestTimeoutMs > 0 )
469493
{
470-
auto requestExpiryTime = std::chrono::high_resolution_clock::now() +
471-
std::chrono::milliseconds(m_configuration.requestTimeoutMs);
472-
waiterTimedOut = !waiter.WaitOnCompletionUntil(requestExpiryTime);
494+
waiterTimedOut = !waiter.WaitOnCompletionFor(m_configuration.requestTimeoutMs);
473495

474496
// if this is true, the waiter timed out without a terminal condition being woken up.
475497
if (waiterTimedOut)

0 commit comments

Comments
 (0)