Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,10 @@ public class AbisMiddleWareStage extends MosipVerticleAPIManager {
private static final String ABIS_QUEUE_NOT_FOUND = "ABIS_QUEUE_NOT_FOUND";
private static final String TEXT_MESSAGE = "text";

/** Set the Consumer Count which required to listen and process message parallel. */
@Value("${mosip.regproc.abis.middleware.activemq.consumer.count:1}")
private Integer consumerCount;

/**
* Get all the abis queue details,register listener to outbound queue's
*/
Expand All @@ -189,7 +193,7 @@ public void setListener(Message message) {
}
}
};
mosipQueueManager.consume(queue, abisQueue.getOutboundQueueName(), listener);
mosipQueueManager.consume(queue, abisQueue.getOutboundQueueName(), listener, consumerCount);
}

} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ public class AbisMessageQueueImpl {
/** The is connection. */
boolean isConnection = false;

/** consumer Count */
private Integer consumerCoint = 1;

/**
* Run abis queue.
*
Expand All @@ -96,7 +99,7 @@ public void setListener(Message message) {
}
};
mosipQueueManager.consume(abisQueueDetails.get(i).getMosipQueue(),
abisQueueDetails.get(i).getInboundQueueName(), listener);
abisQueueDetails.get(i).getInboundQueueName(), listener, consumerCoint);
}

isConnection = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,10 @@ public class ManualAdjudicationStage extends MosipVerticleAPIManager {

private static final String APPLICATION_JSON = "application/json";

/** Set the Consumer Count which required to listen and process message parallel. */
@Value("${mosip.regproc.manual.adjudication.activemq.consumer.count:1}")
private Integer consumerCount;

/**
* Deploy stage.
*/
Expand All @@ -169,7 +173,7 @@ public void setListener(Message message) {
}
};

mosipQueueManager.consume(queue, mvResponseAddress, listener);
mosipQueueManager.consume(queue, mvResponseAddress, listener, consumerCount);

} else {
throw new QueueConnectionNotFound(PlatformErrorMessages.RPR_PRT_QUEUE_CONNECTION_NULL.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ public void setUp() throws java.io.IOException, ApisResourceAccessException, Pac
//Mockito.when(env.getProperty(SwaggerConstant.SERVER_SERVLET_PATH)).thenReturn("/registrationprocessor/v1/manualverification");
Mockito.when(mosipConnectionFactory.createConnection(any(), any(), any(), any(), anyList()))
.thenReturn(mosipQueue);
Mockito.doReturn(new String("str").getBytes()).when(mosipQueueManager).consume(any(), any(), any());
Mockito.doReturn(new String("str").getBytes()).when(mosipQueueManager).consume(any(), any(), any(), any());
Mockito.doNothing().when(router).setRoute(any());
Mockito.when(router.post(any())).thenReturn(null);
Mockito.when(router.get(any())).thenReturn(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,10 @@ public class VerificationStage extends MosipVerticleAPIManager {
@Value("${mosip.regproc.verification.message.expiry-time-limit}")
private Long messageExpiryTimeLimit;

/** Set the Consumer Count which required to listen and process message parallel. */
@Value("${mosip.regproc.verification.activemq.consumer.count:1}")
private Integer consumerCount;

private static final String APPLICATION_JSON = "application/json";

/**
Expand All @@ -170,7 +174,7 @@ public void setListener(Message message) {
}
};

mosipQueueManager.consume(queue, mvResponseAddress, listener);
mosipQueueManager.consume(queue, mvResponseAddress, listener,consumerCount);

} else {
throw new QueueConnectionNotFound(PlatformErrorMessages.RPR_PRT_QUEUE_CONNECTION_NULL.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ public void setUp() throws java.io.IOException, ApisResourceAccessException, Pac
//Mockito.when(env.getProperty(SwaggerConstant.SERVER_SERVLET_PATH)).thenReturn("/registrationprocessor/v1/manualverification");
Mockito.when(mosipConnectionFactory.createConnection(any(), any(), any(), any(), anyList()))
.thenReturn(mosipQueue);
Mockito.doReturn(new String("str").getBytes()).when(mosipQueueManager).consume(any(), any(), any());
Mockito.doReturn(new String("str").getBytes()).when(mosipQueueManager).consume(any(), any(), any(), any());
Mockito.doNothing().when(router).setRoute(any());
Mockito.when(router.post(any())).thenReturn(null);
Mockito.when(router.get(any())).thenReturn(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ public void setListener(Message message) {
}
}
};
mosipQueueManager.consume(mosipQueue, HealthConstant.QUEUE_ADDRESS, listener);
mosipQueueManager.consume(mosipQueue, HealthConstant.QUEUE_ADDRESS, listener, 1);
isConsumerStarted = true;
}
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;


/**
Expand Down Expand Up @@ -182,7 +184,7 @@ public Boolean send(MosipQueue mosipQueue, String message, String address, int m
* .lang.Object, java.lang.String)
*/
@Override
public byte[] consume(MosipQueue mosipQueue, String address, QueueListener object) {
public byte[] consume(MosipQueue mosipQueue, String address, QueueListener object, Integer consumerCount) {
regProcLogger.debug(LoggerFileConstant.SESSIONID.toString(), LoggerFileConstant.USERID.toString(),
"", "MosipActiveMqImpl::consume()::entry");

Expand All @@ -195,34 +197,38 @@ public byte[] consume(MosipQueue mosipQueue, String address, QueueListener objec

throw new InvalidConnectionException(PlatformErrorMessages.RPR_MQI_INVALID_CONNECTION.getMessage());
}
if (destination == null) {
if (connection == null) {
setup(mosipActiveMq);
}
MessageConsumer consumer;
try {
if (session == null) {
regProcLogger.error("Session is null. System will retry to create session");
setup(mosipActiveMq);
}
destination = session.createQueue(address);
consumer = session.createConsumer(destination);
consumer.setMessageListener(QueueListenerFactory.getListener(mosipQueue.getQueueName(), object));
} catch (JMSException | NullPointerException e) {
regProcLogger.error("*******CONSUME EXCEPTION *****", "*******CONSUME EXCEPTION *****",
"*******CONSUME EXCEPTION *****", ExceptionUtils.getFullStackTrace(e));
regProcLogger.error(LoggerFileConstant.SESSIONID.toString(), LoggerFileConstant.REGISTRATIONID.toString(),
"", "MosipActiveMqImpl::consume():: error with error message "
+ PlatformErrorMessages.RPR_MQI_UNABLE_TO_CONSUME_FROM_QUEUE.getMessage());

if (e instanceof NullPointerException && retryCount > 0) {
regProcLogger.warn(LoggerFileConstant.SESSIONID.toString(), LoggerFileConstant.REGISTRATIONID.toString(),
"", "Could not obtain queue connection. System will retry for "+ retryCount +" more times.");
retryCount = retryCount - 1;
consume(mosipQueue, address, object);
} else {
throw new ConnectionUnavailableException(
PlatformErrorMessages.RPR_MQI_UNABLE_TO_CONSUME_FROM_QUEUE.getMessage());
}

ExecutorService executorService = Executors.newFixedThreadPool(consumerCount);

for(int i = 0; i < consumerCount; i++) {
executorService.submit(() -> {
MessageConsumer consumer;
try {
Session session1 = this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination1 = session1.createQueue(address);
consumer = session1.createConsumer(destination1);
consumer.setMessageListener(QueueListenerFactory.getListener(mosipQueue.getQueueName(), object));
} catch (JMSException | NullPointerException e) {
regProcLogger.error("*******CONSUME EXCEPTION *****", "*******CONSUME EXCEPTION *****",
"*******CONSUME EXCEPTION *****", ExceptionUtils.getFullStackTrace(e));
regProcLogger.error(LoggerFileConstant.SESSIONID.toString(), LoggerFileConstant.REGISTRATIONID.toString(),
"", "MosipActiveMqImpl::consume():: error with error message "
+ PlatformErrorMessages.RPR_MQI_UNABLE_TO_CONSUME_FROM_QUEUE.getMessage());

if (e instanceof NullPointerException && retryCount > 0) {
regProcLogger.warn(LoggerFileConstant.SESSIONID.toString(), LoggerFileConstant.REGISTRATIONID.toString(),
"", "Could not obtain queue connection. System will retry for "+ retryCount +" more times.");
retryCount = retryCount - 1;
consume(mosipQueue, address, object, consumerCount);
} else {
throw new ConnectionUnavailableException(
PlatformErrorMessages.RPR_MQI_UNABLE_TO_CONSUME_FROM_QUEUE.getMessage());
}
}
});
}
regProcLogger.debug(LoggerFileConstant.SESSIONID.toString(), LoggerFileConstant.USERID.toString(),
"", "MosipActiveMqImpl::consume()::exit");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,6 @@ public interface MosipQueueManager<T, V>{
* @param address The address
* @return the original message
*/
public V consume(T mosipQueue, String address, QueueListener object);
public V consume(T mosipQueue, String address, QueueListener object, Integer consumerCount);

}