Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 20 additions & 8 deletions blerpc/src/main/java/com/blerpc/BleRpcChannel.java
Original file line number Diff line number Diff line change
Expand Up @@ -131,10 +131,29 @@ private boolean checkMethodType(RpcCall rpcCall) {
}
}

/**
* Cancel subscription and start unsubscribing from characteristic if there isn't other
* subscriptions for this characteristic.
*
* @param subscriptionToCancel - subscription for cancel.
*/
private void cancelSubscription(SubscriptionCallsGroup subscriptionToCancel) {
workHandler.post(
() -> {
subscriptionToCancel.clearCanceled();
if (!subscriptionToCancel.hasAnySubscriber()
&& subscriptionToCancel.status == SubscriptionStatus.SUBSCRIBED) {
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

А что будет, если SUBSCRIBING?

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Да и UNSIBSCRIBING тоже.

Copy link
Copy Markdown
Collaborator Author

@Nikolas-LFDesigns Nikolas-LFDesigns Jan 28, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Посмотри тест testSubscribeIgnoreCanceledCall, похоже он проходит, а значит всё норм будет.
С unsubscribing еще проще: не даст выполниться, пока не пройдет call отписки.

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Я так и не понял, что будет с SUBSCRIBING. Можешь разобраться? Тесты -- это хорошо, но они тестируют только то, что мы предусмотрели и запрограммировали в тестах.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ну смотри, в случае Rx ситуация чисто гипотетическая, ибо мы не сможем отписаться пока не подпишемся, а так произойдет подписка несмотря на cancel, с чем мы тоже ничего не сделаем, ибо процесс writeDescriptor уже запущен и ты его не отменишь в процессе.

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

А вот и не гипотетическая. Мы можем подписать несколько обзерверов и отписывать их пока подписываем новые. И статус соединения тут будет мигать.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Вот последний и определит исход, ну

так уж если мы хотим отписываться сразу же, это просто норма, не понимаю, зачем с этим что-то делать.

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ох, ну опять ты со своим "да все будет ок, расслабься". Это сложный код, который будет нам гейзенбаги выдавать если мы в обработке ошибок напортачим, тут нельзя отмахиваться. Я позже подумаю сам и верифицирую.

startUnsubscribing(subscriptionToCancel);
}
});
}

private void addCall(RpcCall rpcCall) {
calls.add(rpcCall);
if (rpcCall.getMethodType().equals(MethodType.SUBSCRIBE)) {
getSubscriptionForCall(rpcCall).calls.add(rpcCall);
SubscriptionCallsGroup subscription = getSubscriptionForCall(rpcCall);
rpcCall.controller.runOnCancel(unusedObject -> cancelSubscription(subscription));
subscription.calls.add(rpcCall);
}
}

Expand Down Expand Up @@ -437,13 +456,6 @@ private void handleValueChange(BluetoothGatt gatt, BluetoothGattCharacteristic c
return;
}

// If all calls were cancelled, abandon the subscription.
subscription.clearCanceled();
if (!subscription.hasAnySubscriber()) {
startUnsubscribing(subscription);
return;
}

try {
Message response = messageConverter.deserializeResponse(subscription.method, subscription.responsePrototype, characteristic.getValue());
for (RpcCall call : subscription.calls) {
Expand Down
52 changes: 47 additions & 5 deletions blerpc/src/main/java/com/blerpc/BleRpcController.java
Original file line number Diff line number Diff line change
@@ -1,24 +1,26 @@
package com.blerpc;

import com.google.common.base.Optional;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* Implementation of the {@link RpcController}.
*/
public class BleRpcController implements RpcController {

private AtomicBoolean canceled = new AtomicBoolean(false);
private boolean canceled;
private boolean failed = false;
private String failMassage = null;
private Optional<RpcCallback<Void>> cancelCallback = Optional.absent();

@Override
public void reset() {
canceled.set(false);
synchronized (this) {
canceled = false;
failed = false;
failMassage = null;
cancelCallback = Optional.absent();
}
}

Expand All @@ -38,7 +40,14 @@ public String errorText() {

@Override
public void startCancel() {
canceled.set(true);
synchronized (this) {
Comment thread
Nikolas-LFDesigns marked this conversation as resolved.
canceled = true;
}

Optional<RpcCallback<Void>> callback = getAndClearCallback();
if (callback.isPresent()) {
callback.get().run(null);
}
}

@Override
Expand All @@ -51,14 +60,47 @@ public void setFailed(String reason) {

@Override
public boolean isCanceled() {
return canceled.get();
synchronized (this) {
return canceled;
}
}

@Override
public void notifyOnCancel(RpcCallback<Object> callback) {
throw new UnsupportedOperationException("Not implemented.");
}

/**
* Assings a custom callback, which will be called once for:
* <ul>
* <li> cancel event;
* <li> dispose event.
* </ul>
*
* @param callback callback for notifying about cancel events
*/
void runOnCancel(RpcCallback<Void> callback) {
if (isCanceled()) {
callback.run(null);
return;
}

synchronized (this) {
cancelCallback = Optional.of(callback);
}
}

private synchronized Optional<RpcCallback<Void>> getAndClearCallback() {
synchronized (this) {
if (!cancelCallback.isPresent()) {
return Optional.absent();
}
Optional<RpcCallback<Void>> callback = cancelCallback;
cancelCallback = Optional.absent();
return callback;
}
}

/**
* A callback that is called when a subscription to BLE characteristic process finished successfully.
* It will always be called exactly once for {@link com.blerpc.proto.MethodType#SUBSCRIBE} methods.
Expand Down
36 changes: 34 additions & 2 deletions blerpc/src/test/java/com/blerpc/BleRpcChannelTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -638,6 +638,17 @@ public void testSubscribeSuccess() throws Exception {
verifyNoCalls(callback);
}

@Test
public void testSubscribeSuccess_unsubscribeOnCancel() {
BleRpcController bleRpcController = spy(controller);
callSubscribeMethod(bleRpcController, callback);
verify(bleRpcController).runOnCancel(any(RpcCallback.class));
finishSubscribing(descriptor);

bleRpcController.startCancel();
verifyUnsubscribe(descriptor);
}

@Test
public void testSubscribeCalled_onSubscribeSuccessCalled_notBleRpcController() throws Exception {
RpcController rpcController = spy(controller);
Expand Down Expand Up @@ -781,8 +792,10 @@ public void testSubscribeUnsubscribeWithSubscribers() throws Exception {

@Test
public void testSubscribeToCharacteristicOnlyOnce() throws Exception {
callSubscribeMethod(methodSubscribeChar);
callSubscribeMethod(methodSubscribeCharCopy);
BleRpcController localController1 = spy(controller);
BleRpcController localController2 = spy(controller2);
callSubscribeMethod(methodSubscribeChar, localController1);
callSubscribeMethod(methodSubscribeCharCopy, localController2);
finishConnecting();
verifySubscribe(descriptor);
}
Expand Down Expand Up @@ -853,6 +866,21 @@ public void testUnsubscribe_callSomethingInProcess() throws Exception {
assertCallSucceeded(controller2);
}

@Test
public void testUnsubscribe_onlyOnCancelSecondCall() throws Exception {
BleRpcController localController1 = spy(controller);
BleRpcController localController2 = spy(controller2);
callSubscribeMethod(localController1, callback);
callSubscribeMethod(localController2, callback);
finishSubscribing(descriptor);

localController1.startCancel();
verifyNoUnsubscribe(descriptor);
localController2.startCancel();
onUnsubscribe(descriptor);
verifyUnsubscribed();
}

@Test
public void testValueChangedBeforeOnDescriptorWriteSubscribe() {
callSubscribeMethod(controller, callback);
Expand Down Expand Up @@ -911,6 +939,10 @@ void verifyUnsubscribe(BluetoothGattDescriptor descriptor) {
verify(bluetoothGatt, atLeast(2)).writeDescriptor(descriptor);
}

void verifyNoUnsubscribe(BluetoothGattDescriptor descriptor) {
verify(descriptor, never()).setValue(TEST_DISABLE_NOTIFICATION_VALUE);
}

void assertFailBeforeDiscoveringServices(BleRpcController controller) {
verify(bluetoothGatt, never()).discoverServices();
verifyNoReadWrite();
Expand Down
39 changes: 38 additions & 1 deletion blerpc/src/test/java/com/blerpc/BleRpcControllerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@

import static com.blerpc.Assert.assertError;
import static com.google.common.truth.Truth.assertThat;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;

import com.google.protobuf.RpcCallback;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;

/**
Expand All @@ -16,8 +19,12 @@ public class BleRpcControllerTest {

private static final String TEST_FAIL_MESSAGE = "TEST_FAIL_MESSAGE";
private static final RpcCallback<Object> TEST_RPC_CALLBACK = parameter -> {

};

@Mock
RpcCallback<Void> cancelCallback;

private final BleRpcController bleRpcController = new BleRpcController();

@Test
Expand All @@ -31,6 +38,24 @@ public void testStartCancel() {
assertThat(bleRpcController.isCanceled()).isTrue();
assertThat(bleRpcController.failed()).isFalse();
assertThat(bleRpcController.errorText()).isNull();
verifyZeroInteractions(cancelCallback);
}

@Test
public void testStartCancel_notify() {
bleRpcController.runOnCancel(cancelCallback);
bleRpcController.startCancel();

verify(cancelCallback).run(null);
}

@Test
public void testStartCancel_notifyTwice_oneCall() {
bleRpcController.runOnCancel(cancelCallback);
bleRpcController.startCancel();
bleRpcController.startCancel();

verify(cancelCallback).run(null);
}

@Test
Expand Down Expand Up @@ -73,10 +98,22 @@ public void testReset() {
}

@Test
public void testNotifyOnCancel_notImplemented() {
public void testNotifyOnCancel() {
assertError(() -> bleRpcController.notifyOnCancel(TEST_RPC_CALLBACK), "Not implemented.");
}

@Test
public void testRunOnCancel() {
bleRpcController.runOnCancel(cancelCallback);
}

@Test
public void testRunOnCancel_immediatelyNotify() {
bleRpcController.startCancel();
bleRpcController.runOnCancel(cancelCallback);
verify(cancelCallback).run(null);
}

private void verifyInitialState() {
assertThat(bleRpcController.isCanceled()).isFalse();
assertThat(bleRpcController.failed()).isFalse();
Expand Down