diff --git a/blerpc/src/main/java/com/blerpc/BleRpcChannel.java b/blerpc/src/main/java/com/blerpc/BleRpcChannel.java index b5608f91..6f170ebd 100644 --- a/blerpc/src/main/java/com/blerpc/BleRpcChannel.java +++ b/blerpc/src/main/java/com/blerpc/BleRpcChannel.java @@ -131,10 +131,29 @@ private boolean checkMethodType(RpcCall rpcCall) { } } + /** + * Cancel subscription and start unsubscribing from characteristic if there isn't other + * subscriptions for this characteristic. + * + * @param subscriptionToCancel - subscription for cancel. + */ + private void cancelSubscription(SubscriptionCallsGroup subscriptionToCancel) { + workHandler.post( + () -> { + subscriptionToCancel.clearCanceled(); + if (!subscriptionToCancel.hasAnySubscriber() + && subscriptionToCancel.status == SubscriptionStatus.SUBSCRIBED) { + startUnsubscribing(subscriptionToCancel); + } + }); + } + private void addCall(RpcCall rpcCall) { calls.add(rpcCall); if (rpcCall.getMethodType().equals(MethodType.SUBSCRIBE)) { - getSubscriptionForCall(rpcCall).calls.add(rpcCall); + SubscriptionCallsGroup subscription = getSubscriptionForCall(rpcCall); + rpcCall.controller.runOnCancel(unusedObject -> cancelSubscription(subscription)); + subscription.calls.add(rpcCall); } } @@ -437,13 +456,6 @@ private void handleValueChange(BluetoothGatt gatt, BluetoothGattCharacteristic c return; } - // If all calls were cancelled, abandon the subscription. - subscription.clearCanceled(); - if (!subscription.hasAnySubscriber()) { - startUnsubscribing(subscription); - return; - } - try { Message response = messageConverter.deserializeResponse(subscription.method, subscription.responsePrototype, characteristic.getValue()); for (RpcCall call : subscription.calls) { diff --git a/blerpc/src/main/java/com/blerpc/BleRpcController.java b/blerpc/src/main/java/com/blerpc/BleRpcController.java index 3c29eea9..60aa3cf2 100644 --- a/blerpc/src/main/java/com/blerpc/BleRpcController.java +++ b/blerpc/src/main/java/com/blerpc/BleRpcController.java @@ -1,24 +1,26 @@ package com.blerpc; +import com.google.common.base.Optional; import com.google.protobuf.RpcCallback; import com.google.protobuf.RpcController; -import java.util.concurrent.atomic.AtomicBoolean; /** * Implementation of the {@link RpcController}. */ public class BleRpcController implements RpcController { - private AtomicBoolean canceled = new AtomicBoolean(false); + private boolean canceled; private boolean failed = false; private String failMassage = null; + private Optional> cancelCallback = Optional.absent(); @Override public void reset() { - canceled.set(false); synchronized (this) { + canceled = false; failed = false; failMassage = null; + cancelCallback = Optional.absent(); } } @@ -38,7 +40,14 @@ public String errorText() { @Override public void startCancel() { - canceled.set(true); + synchronized (this) { + canceled = true; + } + + Optional> callback = getAndClearCallback(); + if (callback.isPresent()) { + callback.get().run(null); + } } @Override @@ -51,7 +60,9 @@ public void setFailed(String reason) { @Override public boolean isCanceled() { - return canceled.get(); + synchronized (this) { + return canceled; + } } @Override @@ -59,6 +70,37 @@ public void notifyOnCancel(RpcCallback callback) { throw new UnsupportedOperationException("Not implemented."); } + /** + * Assings a custom callback, which will be called once for: + *
    + *
  • cancel event; + *
  • dispose event. + *
+ * + * @param callback callback for notifying about cancel events + */ + void runOnCancel(RpcCallback callback) { + if (isCanceled()) { + callback.run(null); + return; + } + + synchronized (this) { + cancelCallback = Optional.of(callback); + } + } + + private synchronized Optional> getAndClearCallback() { + synchronized (this) { + if (!cancelCallback.isPresent()) { + return Optional.absent(); + } + Optional> callback = cancelCallback; + cancelCallback = Optional.absent(); + return callback; + } + } + /** * A callback that is called when a subscription to BLE characteristic process finished successfully. * It will always be called exactly once for {@link com.blerpc.proto.MethodType#SUBSCRIBE} methods. diff --git a/blerpc/src/test/java/com/blerpc/BleRpcChannelTest.java b/blerpc/src/test/java/com/blerpc/BleRpcChannelTest.java index 92c6bb73..cacae89d 100644 --- a/blerpc/src/test/java/com/blerpc/BleRpcChannelTest.java +++ b/blerpc/src/test/java/com/blerpc/BleRpcChannelTest.java @@ -638,6 +638,17 @@ public void testSubscribeSuccess() throws Exception { verifyNoCalls(callback); } + @Test + public void testSubscribeSuccess_unsubscribeOnCancel() { + BleRpcController bleRpcController = spy(controller); + callSubscribeMethod(bleRpcController, callback); + verify(bleRpcController).runOnCancel(any(RpcCallback.class)); + finishSubscribing(descriptor); + + bleRpcController.startCancel(); + verifyUnsubscribe(descriptor); + } + @Test public void testSubscribeCalled_onSubscribeSuccessCalled_notBleRpcController() throws Exception { RpcController rpcController = spy(controller); @@ -781,8 +792,10 @@ public void testSubscribeUnsubscribeWithSubscribers() throws Exception { @Test public void testSubscribeToCharacteristicOnlyOnce() throws Exception { - callSubscribeMethod(methodSubscribeChar); - callSubscribeMethod(methodSubscribeCharCopy); + BleRpcController localController1 = spy(controller); + BleRpcController localController2 = spy(controller2); + callSubscribeMethod(methodSubscribeChar, localController1); + callSubscribeMethod(methodSubscribeCharCopy, localController2); finishConnecting(); verifySubscribe(descriptor); } @@ -853,6 +866,21 @@ public void testUnsubscribe_callSomethingInProcess() throws Exception { assertCallSucceeded(controller2); } + @Test + public void testUnsubscribe_onlyOnCancelSecondCall() throws Exception { + BleRpcController localController1 = spy(controller); + BleRpcController localController2 = spy(controller2); + callSubscribeMethod(localController1, callback); + callSubscribeMethod(localController2, callback); + finishSubscribing(descriptor); + + localController1.startCancel(); + verifyNoUnsubscribe(descriptor); + localController2.startCancel(); + onUnsubscribe(descriptor); + verifyUnsubscribed(); + } + @Test public void testValueChangedBeforeOnDescriptorWriteSubscribe() { callSubscribeMethod(controller, callback); @@ -911,6 +939,10 @@ void verifyUnsubscribe(BluetoothGattDescriptor descriptor) { verify(bluetoothGatt, atLeast(2)).writeDescriptor(descriptor); } + void verifyNoUnsubscribe(BluetoothGattDescriptor descriptor) { + verify(descriptor, never()).setValue(TEST_DISABLE_NOTIFICATION_VALUE); + } + void assertFailBeforeDiscoveringServices(BleRpcController controller) { verify(bluetoothGatt, never()).discoverServices(); verifyNoReadWrite(); diff --git a/blerpc/src/test/java/com/blerpc/BleRpcControllerTest.java b/blerpc/src/test/java/com/blerpc/BleRpcControllerTest.java index 9b51c49e..1451a297 100644 --- a/blerpc/src/test/java/com/blerpc/BleRpcControllerTest.java +++ b/blerpc/src/test/java/com/blerpc/BleRpcControllerTest.java @@ -2,10 +2,13 @@ import static com.blerpc.Assert.assertError; import static com.google.common.truth.Truth.assertThat; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyZeroInteractions; import com.google.protobuf.RpcCallback; import org.junit.Test; import org.junit.runner.RunWith; +import org.mockito.Mock; import org.mockito.junit.MockitoJUnitRunner; /** @@ -16,8 +19,12 @@ public class BleRpcControllerTest { private static final String TEST_FAIL_MESSAGE = "TEST_FAIL_MESSAGE"; private static final RpcCallback TEST_RPC_CALLBACK = parameter -> { + }; + @Mock + RpcCallback cancelCallback; + private final BleRpcController bleRpcController = new BleRpcController(); @Test @@ -31,6 +38,24 @@ public void testStartCancel() { assertThat(bleRpcController.isCanceled()).isTrue(); assertThat(bleRpcController.failed()).isFalse(); assertThat(bleRpcController.errorText()).isNull(); + verifyZeroInteractions(cancelCallback); + } + + @Test + public void testStartCancel_notify() { + bleRpcController.runOnCancel(cancelCallback); + bleRpcController.startCancel(); + + verify(cancelCallback).run(null); + } + + @Test + public void testStartCancel_notifyTwice_oneCall() { + bleRpcController.runOnCancel(cancelCallback); + bleRpcController.startCancel(); + bleRpcController.startCancel(); + + verify(cancelCallback).run(null); } @Test @@ -73,10 +98,22 @@ public void testReset() { } @Test - public void testNotifyOnCancel_notImplemented() { + public void testNotifyOnCancel() { assertError(() -> bleRpcController.notifyOnCancel(TEST_RPC_CALLBACK), "Not implemented."); } + @Test + public void testRunOnCancel() { + bleRpcController.runOnCancel(cancelCallback); + } + + @Test + public void testRunOnCancel_immediatelyNotify() { + bleRpcController.startCancel(); + bleRpcController.runOnCancel(cancelCallback); + verify(cancelCallback).run(null); + } + private void verifyInitialState() { assertThat(bleRpcController.isCanceled()).isFalse(); assertThat(bleRpcController.failed()).isFalse();