Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 40 additions & 22 deletions common/src/main/java/org/jspace/RemoteSpace.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@
import java.net.URI;
import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.jspace.gate.ClientGate;
import org.jspace.gate.GateFactory;
Expand All @@ -39,16 +41,20 @@
*
*/
public class RemoteSpace implements Space {

private final URI uri;
private final ClientGate gate;

private Map<Long, ClientGate> threadSpaceMap;

public RemoteSpace( URI uri ) throws UnknownHostException, IOException {
this.uri = uri;
this.gate = GateFactory.getInstance().getGateBuilder( uri.getScheme() ).createClientGate(uri);
this.gate.open();
ClientGate gate = GateFactory.getInstance().getGateBuilder( uri.getScheme() ).createClientGate(uri);
gate.open();

this.threadSpaceMap = new HashMap<>();
this.threadSpaceMap.put(Thread.currentThread().getId(), gate);
}


public RemoteSpace(String uri) throws UnknownHostException, IOException {
this(URI.create(uri));
Expand All @@ -62,14 +68,14 @@ public int size() {
}

@Override
public boolean put(Object ... fields) throws InterruptedException {
public boolean put(Object ... fields) throws InterruptedException {
ServerMessage response;
try {
response = gate.send(ClientMessage.putRequest(new Tuple(fields)));
response = getGate().send(ClientMessage.putRequest(new Tuple(fields)));
} catch (IOException e) {
//TODO: Replace with a specific exception
throw new InterruptedException(e.getMessage());
}
}
return response.isSuccessful();
}

Expand All @@ -86,11 +92,11 @@ public Object[] getp(TemplateField ... fields) throws InterruptedException {
private Object[] _get(Template template, boolean isBlocking) throws InterruptedException {
ServerMessage response;
try {
response = gate.send(ClientMessage.getRequest(template,isBlocking,false));
response = getGate().send(ClientMessage.getRequest(template,isBlocking,false));
} catch (IOException e) {
//TODO: Replace with a specific exception
throw new InterruptedException(e.getMessage());
}
}
if (response.isSuccessful()) {
List<Object[]> tuples = response.getTuples();
if (tuples.size()==0) {
Expand All @@ -105,15 +111,15 @@ private Object[] _get(Template template, boolean isBlocking) throws InterruptedE
public List<Object[]> getAll(TemplateField ... fields) throws InterruptedException {
ServerMessage response;
try {
response = gate.send(ClientMessage.getRequest(new Template(fields),false,true));
response = getGate().send(ClientMessage.getRequest(new Template(fields),false,true));
} catch (IOException e) {
//TODO: Replace with a specific exception
throw new InterruptedException(e.getMessage());
}
if (response.isSuccessful()) {
return response.getTuples();
}
return null;
}
return null;
}


Expand All @@ -130,10 +136,10 @@ public Object[] queryp(TemplateField ... fields) throws InterruptedException {
private Object[] _query(Template template, boolean isBlocking) throws InterruptedException {
ServerMessage response;
try {
response = gate.send(ClientMessage.queryRequest(template,isBlocking,false));
response = getGate().send(ClientMessage.queryRequest(template,isBlocking,false));
} catch (IOException e) {
throw new InterruptedException(e.getMessage());
}
}
if (response.isSuccessful()) {
List<Object[]> tuples = response.getTuples();
if (tuples.size()==0) {
Expand All @@ -148,14 +154,14 @@ private Object[] _query(Template template, boolean isBlocking) throws Interrupte
public List<Object[]> queryAll(TemplateField ... fields) throws InterruptedException {
ServerMessage response;
try {
response = gate.send(ClientMessage.queryRequest(new Template(fields),false,true));
response = getGate().send(ClientMessage.queryRequest(new Template(fields),false,true));
} catch (IOException e) {
//TODO: Replace with a specific exception
throw new InterruptedException(e.getMessage());
}
if (response.isSuccessful()) {
return response.getTuples();
}
}
return null;
}

Expand All @@ -164,8 +170,24 @@ public URI getUri() {
return uri;
}

private ClientGate getGate() {
long threadID = Thread.currentThread().getId();
if (!this.threadSpaceMap.containsKey(threadID)) {
ClientGate gate = null;
try {
gate = openNewGate();
} catch (IOException e) {
// TODO: Use log
e.printStackTrace();
}
this.threadSpaceMap.put(threadID, gate);
}
return this.threadSpaceMap.get(threadID);
}

public ClientGate getGate() {
private ClientGate openNewGate() throws IOException {
ClientGate gate = GateFactory.getInstance().getGateBuilder( uri.getScheme() ).createClientGate(uri);
gate.open();
return gate;
}

Expand All @@ -180,8 +202,4 @@ public ClientGate getGate() {
// // TODO Auto-generated method stub
// return null;
// }

public void close() throws IOException {
gate.close();
}
}