Skip to content
This repository was archived by the owner on Oct 9, 2023. It is now read-only.

Commit 1cd56f1

Browse files
author
Alex Walker
authored
Fix session pulses never getting sent (#143)
## What is the goal of this PR? The existing implementation of session renewal was not working, at all. Sessions would simply time out after 10 seconds. We fixed it. ## What are the changes implemented in this PR? Fix session pulses never getting sent
1 parent 70f0ef6 commit 1cd56f1

File tree

2 files changed

+14
-8
lines changed

2 files changed

+14
-8
lines changed

grakn/client.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,6 @@
1818
#
1919

2020
import grpc
21-
import sched
22-
import time
2321

2422
from grakn.options import GraknOptions
2523
from grakn.rpc.database_manager import DatabaseManager as _DatabaseManager
@@ -36,7 +34,6 @@ class GraknClient(object):
3634
def __init__(self, address=DEFAULT_URI):
3735
self._channel = grpc.insecure_channel(address)
3836
self._databases = _DatabaseManager(self._channel)
39-
self._scheduler = sched.scheduler(time.time, time.sleep)
4037

4138
def session(self, database: str, session_type: SessionType, options=GraknOptions()):
4239
return _Session(self, database, session_type, options)

grakn/rpc/session.py

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@
1616
# specific language governing permissions and limitations
1717
# under the License.
1818
#
19+
import sched
20+
import time
21+
from threading import Thread
1922

2023
from graknprotocol.protobuf.grakn_pb2_grpc import GraknStub
2124
import graknprotocol.protobuf.session_pb2 as session_proto
@@ -33,9 +36,11 @@ class SessionType(enum.Enum):
3336

3437
class Session(object):
3538

39+
_PULSE_FREQUENCY_SECONDS = 5
40+
3641
def __init__(self, client, database: str, session_type: SessionType, options=GraknOptions()):
3742
self._channel = client._channel
38-
self._scheduler = client._scheduler
43+
self._scheduler = sched.scheduler(time.time, time.sleep)
3944
self._database = database
4045
self._session_type = session_type
4146
self._grpc_stub = GraknStub(self._channel)
@@ -47,7 +52,10 @@ def __init__(self, client, database: str, session_type: SessionType, options=Gra
4752

4853
self._session_id = self._grpc_stub.session_open(open_req).session_id
4954
self._is_open = True
50-
self._pulse = self._scheduler.enter(5, 1, self._transmit_pulse, ())
55+
self._scheduler.enter(delay=self._PULSE_FREQUENCY_SECONDS, priority=1, action=self._transmit_pulse, argument=())
56+
# TODO: This thread blocks the process from closing. We should try cancelling the scheduled task when the
57+
# session closes. If that doesn't work, we need some other way of getting the thread to exit.
58+
Thread(target=self._scheduler.run).start()
5159

5260
def transaction(self, transaction_type: TransactionType, options=GraknOptions()):
5361
return Transaction(self._channel, self._session_id, transaction_type, options)
@@ -70,9 +78,10 @@ def _transmit_pulse(self):
7078
return
7179
pulse_req = session_proto.Session.Pulse.Req()
7280
pulse_req.session_id = self._session_id
73-
is_alive = self._grpc_stub.session_pulse(pulse_req).is_alive
74-
if is_alive:
75-
self._pulse = self._scheduler.enter(5, 1, self._transmit_pulse, ())
81+
res = self._grpc_stub.session_pulse(pulse_req)
82+
if res.alive:
83+
self._scheduler.enter(delay=self._PULSE_FREQUENCY_SECONDS, priority=1, action=self._transmit_pulse, argument=())
84+
Thread(target=self._scheduler.run).start()
7685

7786
def __enter__(self):
7887
return self

0 commit comments

Comments
 (0)