diff --git a/src/main/java/storm/sqs/SqsQueueSpout.java b/src/main/java/storm/sqs/SqsQueueSpout.java index df34169..887ae8b 100644 --- a/src/main/java/storm/sqs/SqsQueueSpout.java +++ b/src/main/java/storm/sqs/SqsQueueSpout.java @@ -1,6 +1,8 @@ package storm.sqs; import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; import java.util.List; import java.util.Map; import java.util.concurrent.LinkedBlockingQueue; @@ -55,6 +57,7 @@ public abstract class SqsQueueSpout extends BaseRichSpout { private SpoutOutputCollector collector; private AmazonSQSAsync sqs; + private final String endpoint; private final String queueUrl; private final boolean reliable; private LinkedBlockingQueue queue; @@ -66,6 +69,11 @@ public abstract class SqsQueueSpout extends BaseRichSpout { * @param reliable whether this spout uses Storm's reliability facilities */ public SqsQueueSpout(String queueUrl, boolean reliable) { + try { + this.endpoint = (new URI(queueUrl)).getHost(); + } catch (URISyntaxException e) { + throw new IllegalArgumentException("Illegal value for queueUrl: "+queueUrl); + } this.queueUrl = queueUrl; this.reliable = reliable; this.sleepTime = 100; @@ -79,6 +87,8 @@ public void open( queue = new LinkedBlockingQueue(); try { sqs = new AmazonSQSAsyncClient(new PropertiesCredentials(SqsQueueSpout.class.getResourceAsStream("/AwsCredentials.properties"))); + // Overrides the default endpoint for this client ("sqs.us-east-1.amazonaws.com"). + sqs.setEndpoint(endpoint); } catch (IOException ioe) { throw new RuntimeException("Couldn't load AWS credentials", ioe); }