Skip to content

Commit 7bd52e6

Browse files
andrew-signalcody-signal
authored andcommitted
Refactor LibSignalChatConnection to use an explicit queue sendRequest handling while CONNECTING.
1 parent 89944d7 commit 7bd52e6

File tree

4 files changed

+248
-131
lines changed

4 files changed

+248
-131
lines changed

app/src/main/java/org/thoughtcrime/securesms/util/RemoteConfig.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1005,7 +1005,7 @@ object RemoteConfig {
10051005
@JvmStatic
10061006
@get:JvmName("libSignalWebSocketEnabled")
10071007
val libSignalWebSocketEnabled: Boolean by remoteValue(
1008-
key = "android.libsignalWebSocketEnabled.4",
1008+
key = "android.libsignalWebSocketEnabled.5",
10091009
hotSwappable = false
10101010
) { value ->
10111011
value.asBoolean(false) || Environment.IS_NIGHTLY

libsignal-service/src/main/java/org/whispersystems/signalservice/api/websocket/WebSocketConnectionState.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ public enum WebSocketConnectionState {
77
DISCONNECTED,
88
CONNECTING,
99
CONNECTED,
10-
RECONNECTING,
1110
DISCONNECTING,
1211
AUTHENTICATION_FAILED,
1312
REMOTE_DEPRECATED,

libsignal-service/src/main/java/org/whispersystems/signalservice/internal/websocket/LibSignalChatConnection.kt

Lines changed: 150 additions & 129 deletions
Original file line numberDiff line numberDiff line change
@@ -81,8 +81,14 @@ class LibSignalChatConnection(
8181
private val nextIncomingMessageInternalPseudoId = AtomicLong(1)
8282
val ackSenderForInternalPseudoId = ConcurrentHashMap<Long, ChatConnectionListener.ServerMessageAck>()
8383

84-
// CHAT_SERVICE_LOCK: Protects state, stateChangedOrMessageReceivedCondition, chatConnection, and
85-
// chatConnectionFuture
84+
private data class RequestAwaitingConnection(
85+
val request: WebSocketRequestMessage,
86+
val timeoutSeconds: Long,
87+
val single: SingleSubject<WebsocketResponse>
88+
)
89+
90+
// CHAT_SERVICE_LOCK: Protects state, stateChangedOrMessageReceivedCondition, chatConnection,
91+
// chatConnectionFuture, and requestsAwaitingConnection.
8692
// stateChangedOrMessageReceivedCondition: derived from CHAT_SERVICE_LOCK, used by readRequest(),
8793
// exists to emulate idiosyncratic behavior of OkHttpWebSocketConnection for readRequest()
8894
// chatConnection: Set only when state == CONNECTED
@@ -92,6 +98,10 @@ class LibSignalChatConnection(
9298
private var chatConnection: ChatConnection? = null
9399
private var chatConnectionFuture: CompletableFuture<out ChatConnection>? = null
94100

101+
// requestsAwaitingConnection should only have contents when we are transitioning to, out of, or are
102+
// in the CONNECTING state.
103+
private val requestsAwaitingConnection = mutableListOf<RequestAwaitingConnection>()
104+
95105
companion object {
96106
const val SERVICE_ENVELOPE_REQUEST_VERB = "PUT"
97107
const val SERVICE_ENVELOPE_REQUEST_PATH = "/api/v1/message"
@@ -133,11 +143,11 @@ class LibSignalChatConnection(
133143
val stateMonitor = state
134144
.skip(1) // Skip the transition to the initial DISCONNECTED state
135145
.subscribe { nextState ->
136-
if (nextState == WebSocketConnectionState.DISCONNECTED) {
137-
cleanup()
138-
}
139-
140146
CHAT_SERVICE_LOCK.withLock {
147+
if (nextState == WebSocketConnectionState.DISCONNECTED) {
148+
cleanup()
149+
}
150+
141151
stateChangedOrMessageReceivedCondition.signalAll()
142152
}
143153
}
@@ -150,6 +160,17 @@ class LibSignalChatConnection(
150160
// there is no ackSender for a pseudoId gracefully in sendResponse.
151161
ackSenderForInternalPseudoId.clear()
152162
// There's no sense in resetting nextIncomingMessageInternalPseudoId.
163+
164+
// This is a belt-and-suspenders check, because the transition handler leaving the CONNECTING
165+
// state should always cleanup the requestsAwaitingConnection, but in case we miss one, log it
166+
// as an error and clean it up gracefully
167+
if (requestsAwaitingConnection.isNotEmpty()) {
168+
Log.w(TAG, "$name [cleanup] ${requestsAwaitingConnection.size} requestsAwaitingConnection during cleanup! This is probably a bug.")
169+
requestsAwaitingConnection.forEach { pending ->
170+
pending.single.onError(SocketException("Connection terminated unexpectedly"))
171+
}
172+
requestsAwaitingConnection.clear()
173+
}
153174
}
154175

155176
init {
@@ -159,6 +180,42 @@ class LibSignalChatConnection(
159180
}
160181
}
161182

183+
private fun sendRequestInternal(request: WebSocketRequestMessage, timeoutSeconds: Long, single: SingleSubject<WebsocketResponse>) {
184+
CHAT_SERVICE_LOCK.withLock {
185+
check(state.value == WebSocketConnectionState.CONNECTED)
186+
187+
val internalRequest = request.toLibSignalRequest(timeout = timeoutSeconds.seconds)
188+
chatConnection!!.send(internalRequest)
189+
.whenComplete(
190+
onSuccess = { response ->
191+
Log.d(TAG, "$name [sendRequest] Success: ${response!!.status}")
192+
when (response.status) {
193+
in 400..599 -> {
194+
healthMonitor.onMessageError(
195+
status = response.status,
196+
isIdentifiedWebSocket = chatConnection is AuthenticatedChatConnection
197+
)
198+
}
199+
}
200+
// Here success means "we received the response" even if it is reporting an error.
201+
// This is consistent with the behavior of the OkHttpWebSocketConnection.
202+
single.onSuccess(response.toWebsocketResponse(isUnidentified = (chatConnection is UnauthenticatedChatConnection)))
203+
},
204+
onFailure = { throwable ->
205+
Log.w(TAG, "$name [sendRequest] Failure:", throwable)
206+
val downstreamThrowable = when (throwable) {
207+
is ConnectionInvalidatedException -> NonSuccessfulResponseCodeException(4401)
208+
// The clients of WebSocketConnection are often sensitive to the exact type of exception returned.
209+
// This is the exception that OkHttpWebSocketConnection throws in the closest scenario to this, when
210+
// the connection fails before the request completes.
211+
else -> SocketException("Failed to get response for request")
212+
}
213+
single.onError(downstreamThrowable)
214+
}
215+
)
216+
}
217+
}
218+
162219
override fun connect(): Observable<WebSocketConnectionState> {
163220
CHAT_SERVICE_LOCK.withLock {
164221
if (!isDead()) {
@@ -175,49 +232,84 @@ class LibSignalChatConnection(
175232
// nullability concern here.
176233
chatConnectionFuture!!.whenComplete(
177234
onSuccess = { connection ->
178-
CHAT_SERVICE_LOCK.withLock {
179-
if (state.value == WebSocketConnectionState.CONNECTING) {
180-
chatConnection = connection
181-
connection?.start()
182-
Log.i(TAG, "$name Connected")
183-
state.onNext(WebSocketConnectionState.CONNECTED)
184-
} else {
185-
Log.i(TAG, "$name Dropped successful connection because we are now ${state.value}")
186-
disconnect()
187-
}
188-
}
235+
handleConnectionSuccess(connection!!)
189236
},
190237
onFailure = { throwable ->
191-
CHAT_SERVICE_LOCK.withLock {
192-
if (throwable is CancellationException) {
193-
// We should have transitioned to DISCONNECTED immediately after we canceled chatConnectionFuture
194-
check(state.value == WebSocketConnectionState.DISCONNECTED)
195-
Log.i(TAG, "$name [connect] cancelled")
196-
return@whenComplete
197-
}
238+
handleConnectionFailure(throwable)
239+
}
240+
)
241+
return state
242+
}
243+
}
198244

199-
Log.w(TAG, "$name [connect] Failure:", throwable)
200-
chatConnection = null
201-
// Internally, libsignal-net will throw this DeviceDeregisteredException when the HTTP CONNECT
202-
// request returns HTTP 403.
203-
// The chat service currently does not return HTTP 401 on /v1/websocket.
204-
// Thus, this currently matches the implementation in OkHttpWebSocketConnection.
205-
when (throwable) {
206-
is DeviceDeregisteredException -> {
207-
state.onNext(WebSocketConnectionState.AUTHENTICATION_FAILED)
208-
}
209-
is AppExpiredException -> {
210-
state.onNext(WebSocketConnectionState.REMOTE_DEPRECATED)
211-
}
212-
else -> {
213-
Log.w(TAG, "Unknown connection failure reason", throwable)
214-
state.onNext(WebSocketConnectionState.FAILED)
215-
}
245+
private fun handleConnectionSuccess(connection: ChatConnection) {
246+
CHAT_SERVICE_LOCK.withLock {
247+
when (state.value) {
248+
WebSocketConnectionState.CONNECTING -> {
249+
chatConnection = connection
250+
chatConnection?.start()
251+
Log.i(TAG, "$name Connected")
252+
state.onNext(WebSocketConnectionState.CONNECTED)
253+
254+
requestsAwaitingConnection.forEach { pending ->
255+
runCatching {
256+
sendRequestInternal(pending.request, pending.timeoutSeconds, pending.single)
257+
}.onFailure { e ->
258+
Log.w(TAG, "$name [sendRequest] Failed to send pending request", e)
259+
pending.single.onError(SocketException("Closed unexpectedly"))
216260
}
217261
}
262+
263+
requestsAwaitingConnection.clear()
218264
}
219-
)
220-
return state
265+
else -> {
266+
Log.i(TAG, "$name Dropped successful connection because we are now ${state.value}")
267+
disconnect()
268+
}
269+
}
270+
}
271+
}
272+
273+
private fun handleConnectionFailure(throwable: Throwable) {
274+
CHAT_SERVICE_LOCK.withLock {
275+
if (throwable is CancellationException) {
276+
// We should have transitioned to DISCONNECTED immediately after we canceled chatConnectionFuture
277+
check(state.value == WebSocketConnectionState.DISCONNECTED)
278+
Log.i(TAG, "$name [connect] cancelled")
279+
return
280+
}
281+
282+
Log.w(TAG, "$name [connect] Failure:", throwable)
283+
chatConnection = null
284+
285+
// Internally, libsignal-net will throw this DeviceDeregisteredException when the HTTP CONNECT
286+
// request returns HTTP 403.
287+
// The chat service currently does not return HTTP 401 on /v1/websocket.
288+
// Thus, this currently matches the implementation in OkHttpWebSocketConnection.
289+
when (throwable) {
290+
is DeviceDeregisteredException -> {
291+
state.onNext(WebSocketConnectionState.AUTHENTICATION_FAILED)
292+
}
293+
is AppExpiredException -> {
294+
state.onNext(WebSocketConnectionState.REMOTE_DEPRECATED)
295+
}
296+
else -> {
297+
Log.w(TAG, "Unknown connection failure reason", throwable)
298+
state.onNext(WebSocketConnectionState.FAILED)
299+
}
300+
}
301+
302+
val downstreamThrowable = when (throwable) {
303+
is DeviceDeregisteredException -> NonSuccessfulResponseCodeException(403)
304+
// This is just to match what OkHttpWebSocketConnection does in the case a pending request fails
305+
// due to the underlying transport refusing to open.
306+
else -> SocketException("Closed unexpectedly")
307+
}
308+
309+
requestsAwaitingConnection.forEach { pending ->
310+
pending.single.onError(downstreamThrowable)
311+
}
312+
requestsAwaitingConnection.clear()
221313
}
222314
}
223315

@@ -231,8 +323,7 @@ class LibSignalChatConnection(
231323
WebSocketConnectionState.REMOTE_DEPRECATED -> true
232324

233325
WebSocketConnectionState.CONNECTING,
234-
WebSocketConnectionState.CONNECTED,
235-
WebSocketConnectionState.RECONNECTING -> false
326+
WebSocketConnectionState.CONNECTED -> false
236327

237328
null -> throw IllegalStateException("LibSignalChatConnection.state can never be null")
238329
}
@@ -285,90 +376,20 @@ class LibSignalChatConnection(
285376

286377
val single = SingleSubject.create<WebsocketResponse>()
287378

288-
if (state.value == WebSocketConnectionState.CONNECTING) {
289-
// In OkHttpWebSocketConnection, if a client calls sendRequest while we are still
290-
// connecting to the Chat service, we queue the request to be sent after the
291-
// the connection is established.
292-
// We carry forward that behavior here, except we have to use future chaining
293-
// rather than directly writing to the connection for it to buffer for us,
294-
// because libsignal-net does not expose a connection handle until the connection
295-
// is established.
296-
Log.i(TAG, "[sendRequest] Enqueuing request send for after connection")
297-
// We are in the CONNECTING state, so our invariant says that chatConnectionFuture should
298-
// be set, so we should not have to worry about nullability here.
299-
chatConnectionFuture!!.whenComplete(
300-
onSuccess = {
301-
// We depend on the libsignal's CompletableFuture's synchronization guarantee to
302-
// keep this implementation simple. If another CompletableFuture implementation is
303-
// used, we'll need to add some logic here to be ensure this completion handler
304-
// fires after the one enqueued in connect().
305-
try {
306-
sendRequest(request).subscribe(
307-
{ response ->
308-
single.onSuccess(response)
309-
},
310-
{ error ->
311-
single.onError(error)
312-
}
313-
)
314-
} catch (e: IOException) {
315-
// We failed to send the request because the connection closed between
316-
// when we got the completion callback and when we got scheduled for
317-
// execution. So, we need to propagate that error downstream, but we
318-
// do not need to worry about pendingResponses, because the response
319-
// single was never added to pendingResponses. (It is only added to
320-
// the set after the request is *successfully* sent off.)
321-
// There's also an additional complication that we know from in-the-field
322-
// crash reports that some downstream consumer of the single's error
323-
// call is not resilient to raw IOExceptions, so we need to again mirror
324-
// the OkHttpWebSocketConnection behavior of passing an explicit
325-
// SocketException instead.
326-
single.onError(SocketException("Closed unexpectedly"))
327-
}
328-
},
329-
onFailure = { throwable ->
330-
// This matches the behavior of OkHttpWebSocketConnection when the connection fails
331-
// before the buffered request can be sent.
332-
val downstreamThrowable = when (throwable) {
333-
is DeviceDeregisteredException -> NonSuccessfulResponseCodeException(403)
334-
else -> SocketException("Closed unexpectedly")
335-
}
336-
single.onError(downstreamThrowable)
337-
}
338-
)
339-
return single.subscribeOn(Schedulers.io()).observeOn(Schedulers.io())
340-
}
341-
342-
val internalRequest = request.toLibSignalRequest(timeout = timeoutSeconds.seconds)
343-
chatConnection!!.send(internalRequest)
344-
.whenComplete(
345-
onSuccess = { response ->
346-
Log.d(TAG, "$name [sendRequest] Success: ${response!!.status}")
347-
when (response.status) {
348-
in 400..599 -> {
349-
healthMonitor.onMessageError(
350-
status = response.status,
351-
isIdentifiedWebSocket = chatConnection is AuthenticatedChatConnection
352-
)
353-
}
354-
}
355-
// Here success means "we received the response" even if it is reporting an error.
356-
// This is consistent with the behavior of the OkHttpWebSocketConnection.
357-
single.onSuccess(response.toWebsocketResponse(isUnidentified = (chatConnection is UnauthenticatedChatConnection)))
358-
},
359-
onFailure = { throwable ->
360-
Log.w(TAG, "$name [sendRequest] Failure:", throwable)
361-
val downstreamThrowable = when (throwable) {
362-
is ConnectionInvalidatedException -> NonSuccessfulResponseCodeException(4401)
363-
// The clients of WebSocketConnection are often sensitive to the exact type of exception returned.
364-
// This is the exception that OkHttpWebSocketConnection throws in the closest scenario to this, when
365-
// the connection fails before the request completes.
366-
else -> SocketException("Failed to get response for request")
367-
}
368-
single.onError(downstreamThrowable)
369-
}
370-
)
371-
return single.subscribeOn(Schedulers.io()).observeOn(Schedulers.io())
379+
return when (state.value) {
380+
WebSocketConnectionState.CONNECTING -> {
381+
Log.i(TAG, "[sendRequest] Enqueuing request send for after connection")
382+
requestsAwaitingConnection.add(RequestAwaitingConnection(request, timeoutSeconds, single))
383+
single
384+
}
385+
WebSocketConnectionState.CONNECTED -> {
386+
sendRequestInternal(request, timeoutSeconds, single)
387+
single
388+
}
389+
else -> {
390+
throw IllegalStateException("LibSignalChatConnection.state was neither dead, CONNECTING, or CONNECTED.")
391+
}
392+
}.subscribeOn(Schedulers.io()).observeOn(Schedulers.io())
372393
}
373394
}
374395

0 commit comments

Comments
 (0)