From 652f3e968e4896af22800d5324c352db6c0d6c77 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mads=20Bj=C3=B8rn?= Date: Tue, 7 Jan 2020 17:34:50 +0100 Subject: [PATCH] Updated RemoteSpace to have a gate for each thread, to avoid threads with blocking calls blocking other threads --- .../src/main/java/org/jspace/RemoteSpace.java | 62 ++++++++++++------- 1 file changed, 40 insertions(+), 22 deletions(-) diff --git a/common/src/main/java/org/jspace/RemoteSpace.java b/common/src/main/java/org/jspace/RemoteSpace.java index 0bafdf2..21c9214 100644 --- a/common/src/main/java/org/jspace/RemoteSpace.java +++ b/common/src/main/java/org/jspace/RemoteSpace.java @@ -27,7 +27,9 @@ import java.net.URI; import java.net.UnknownHostException; import java.util.Arrays; +import java.util.HashMap; import java.util.List; +import java.util.Map; import org.jspace.gate.ClientGate; import org.jspace.gate.GateFactory; @@ -39,16 +41,20 @@ * */ public class RemoteSpace implements Space { - + private final URI uri; - private final ClientGate gate; + + private Map threadSpaceMap; public RemoteSpace( URI uri ) throws UnknownHostException, IOException { this.uri = uri; - this.gate = GateFactory.getInstance().getGateBuilder( uri.getScheme() ).createClientGate(uri); - this.gate.open(); + ClientGate gate = GateFactory.getInstance().getGateBuilder( uri.getScheme() ).createClientGate(uri); + gate.open(); + + this.threadSpaceMap = new HashMap<>(); + this.threadSpaceMap.put(Thread.currentThread().getId(), gate); } - + public RemoteSpace(String uri) throws UnknownHostException, IOException { this(URI.create(uri)); @@ -62,14 +68,14 @@ public int size() { } @Override - public boolean put(Object ... fields) throws InterruptedException { + public boolean put(Object ... fields) throws InterruptedException { ServerMessage response; try { - response = gate.send(ClientMessage.putRequest(new Tuple(fields))); + response = getGate().send(ClientMessage.putRequest(new Tuple(fields))); } catch (IOException e) { //TODO: Replace with a specific exception throw new InterruptedException(e.getMessage()); - } + } return response.isSuccessful(); } @@ -86,11 +92,11 @@ public Object[] getp(TemplateField ... fields) throws InterruptedException { private Object[] _get(Template template, boolean isBlocking) throws InterruptedException { ServerMessage response; try { - response = gate.send(ClientMessage.getRequest(template,isBlocking,false)); + response = getGate().send(ClientMessage.getRequest(template,isBlocking,false)); } catch (IOException e) { //TODO: Replace with a specific exception throw new InterruptedException(e.getMessage()); - } + } if (response.isSuccessful()) { List tuples = response.getTuples(); if (tuples.size()==0) { @@ -105,15 +111,15 @@ private Object[] _get(Template template, boolean isBlocking) throws InterruptedE public List getAll(TemplateField ... fields) throws InterruptedException { ServerMessage response; try { - response = gate.send(ClientMessage.getRequest(new Template(fields),false,true)); + response = getGate().send(ClientMessage.getRequest(new Template(fields),false,true)); } catch (IOException e) { //TODO: Replace with a specific exception throw new InterruptedException(e.getMessage()); } if (response.isSuccessful()) { return response.getTuples(); - } - return null; + } + return null; } @@ -130,10 +136,10 @@ public Object[] queryp(TemplateField ... fields) throws InterruptedException { private Object[] _query(Template template, boolean isBlocking) throws InterruptedException { ServerMessage response; try { - response = gate.send(ClientMessage.queryRequest(template,isBlocking,false)); + response = getGate().send(ClientMessage.queryRequest(template,isBlocking,false)); } catch (IOException e) { throw new InterruptedException(e.getMessage()); - } + } if (response.isSuccessful()) { List tuples = response.getTuples(); if (tuples.size()==0) { @@ -148,14 +154,14 @@ private Object[] _query(Template template, boolean isBlocking) throws Interrupte public List queryAll(TemplateField ... fields) throws InterruptedException { ServerMessage response; try { - response = gate.send(ClientMessage.queryRequest(new Template(fields),false,true)); + response = getGate().send(ClientMessage.queryRequest(new Template(fields),false,true)); } catch (IOException e) { //TODO: Replace with a specific exception throw new InterruptedException(e.getMessage()); } if (response.isSuccessful()) { return response.getTuples(); - } + } return null; } @@ -164,8 +170,24 @@ public URI getUri() { return uri; } + private ClientGate getGate() { + long threadID = Thread.currentThread().getId(); + if (!this.threadSpaceMap.containsKey(threadID)) { + ClientGate gate = null; + try { + gate = openNewGate(); + } catch (IOException e) { + // TODO: Use log + e.printStackTrace(); + } + this.threadSpaceMap.put(threadID, gate); + } + return this.threadSpaceMap.get(threadID); + } - public ClientGate getGate() { + private ClientGate openNewGate() throws IOException { + ClientGate gate = GateFactory.getInstance().getGateBuilder( uri.getScheme() ).createClientGate(uri); + gate.open(); return gate; } @@ -180,8 +202,4 @@ public ClientGate getGate() { // // TODO Auto-generated method stub // return null; // } - - public void close() throws IOException { - gate.close(); - } }