Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

import org.apache.commons.lang3.exception.ExceptionUtils;
import org.slf4j.MDC;
Expand Down Expand Up @@ -146,7 +147,7 @@ public MosipEventBus getEventBus(Object verticleName, String clusterManagerUrl,
VertxOptions options = new VertxOptions().setClustered(true).setClusterManager(clusterManager)
.setHAEnabled(false).setWorkerPoolSize(instanceNumber)
.setEventBusOptions(new EventBusOptions().setPort(getEventBusPort()).setHost(address))
.setMetricsOptions(micrometerMetricsOptions);
.setMetricsOptions(micrometerMetricsOptions).setMaxEventLoopExecuteTime(getMaxEventLoopExecutionTime()).setMaxEventLoopExecuteTimeUnit(TimeUnit.SECONDS);
Vertx.clusteredVertx(options, result -> {
if (result.succeeded()) {
result.result().deployVerticle((Verticle) verticleName,
Expand Down Expand Up @@ -282,6 +283,10 @@ public Integer getEventBusPort() {
return getIntegerPropertyForSuffix("eventbus.port");
}

public Integer getMaxEventLoopExecutionTime() {
return getIntegerPropertyForSuffix("eventbus.maxloop.exec.time.seconds", 2);
}

public Integer getPort() {
return getIntegerPropertyForSuffix("server.port");
}
Expand All @@ -290,6 +295,10 @@ protected Integer getIntegerPropertyForSuffix(String propSuffix) {
return propertiesUtil.getIntegerProperty(getPropertyPrefix(), propSuffix);
}

protected Integer getIntegerPropertyForSuffix(String propSuffix, Integer defaultValue) {
return propertiesUtil.getProperty(getPropertyPrefix() + propSuffix, Integer.class, defaultValue);
}

protected Boolean getBooleanPropertyForSuffix(String propSuffix, Boolean defaultValue) {
return propertiesUtil.getProperty(getPropertyPrefix() + propSuffix, Boolean.class, defaultValue);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,7 @@
import java.util.ArrayList;

import brave.Tracing;
import io.mosip.registration.processor.core.tracing.EventTracingHandler;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.logging.SLF4JLogDelegateFactory;
import org.assertj.core.util.Objects;
import org.junit.Assert;
import org.junit.Before;
import org.mockito.Mockito;
import org.springframework.boot.test.mock.mockito.MockBean;

import io.mosip.registration.processor.core.abstractverticle.MessageBusAddress;
import io.mosip.registration.processor.core.abstractverticle.MessageDTO;
Expand Down Expand Up @@ -66,7 +59,7 @@ public URL findUrl()
URL url=loader.getResource("cluster.xml");
return url;
}

@Override
public Integer getEventBusPort() {
return 5711;
Expand All @@ -82,4 +75,8 @@ protected String getPropertyPrefix() {
return EMPTY_STRING;
}

@Override
public Integer getMaxEventLoopExecutionTime() {
return 3;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -621,7 +621,7 @@ private KafkaConsumerRecords<String, String> prepareKafkaConsumerRecords(int rec
consumerRecordList.add(
new ConsumerRecord<String,String>(
MessageBusAddress.PACKET_VALIDATOR_BUS_IN.getAddress(), 0, i, "1000"+i,
"{\"rid\":\"1000"+i+"\", \"reg_type\": \"NEW\" }"));
"{\"rid\":\"1000"+i+"\", \"reg_type\": \"NEW\", \"messageBusAddress\" : { \"address\" : \"packet-uploader-new-bus-out\"}}"));
Map<TopicPartition, List<ConsumerRecord<String,String>>> topicPartitionConsumerRecordListMap =
new HashMap<TopicPartition, List<ConsumerRecord<String,String>>>();
topicPartitionConsumerRecordListMap.put(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
package io.mosip.registration.processor.status.dao;

import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.*;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.Page;
Expand Down Expand Up @@ -83,6 +79,17 @@ public RegistrationStatusEntity save(RegistrationStatusEntity registrationStatus
return registrationStatusRepositary.save(registrationStatusEntity);
}

/**
* Save.
*
* @param registrationStatusEntities
* the registration status entity list
* @return the registration status entity
*/
public List<RegistrationStatusEntity> saveAll(List<RegistrationStatusEntity> registrationStatusEntities) {
return registrationStatusRepositary.saveAll(registrationStatusEntities);
}

/**
* Update.
*
Expand Down Expand Up @@ -247,4 +254,45 @@ public List<RegistrationStatusEntity> findByIdAndProcessAndIteration(String id,
{
return registrationStatusRepositary.getByIdAndProcessAndIteration(id, process, iteration);
}

/**
* Gets the un processed packets.
*
*
* @param processList
* the process List
* @param fetchSize
* the fetch size
* @param elapseTime
* the elapse time
* @param reprocessCount
* the reprocess count
* @param trnStatusList
* the transaction status
* @param excludeStageNames
* the stage which need to exclude
* @param statusList
* the status
* @return the un processed packets
*/
public List<RegistrationStatusEntity> getUnProcessedPackets(List<String> processList, Integer fetchSize, long elapseTime,
Integer reprocessCount, List<String> trnStatusList, List<String> excludeStageNames, List<String> statusList) {

LocalDateTime timeDifference = LocalDateTime.now().minusSeconds(elapseTime);
List<String> statusCodes=new ArrayList<>();
statusCodes.add(RegistrationStatusCode.PAUSED.toString());
statusCodes.add(RegistrationStatusCode.RESUMABLE.toString());
statusCodes.add(RegistrationStatusCode.PAUSED_FOR_ADDITIONAL_INFO.toString());
statusCodes.add(RegistrationStatusCode.REJECTED.toString());
statusCodes.add(RegistrationStatusCode.FAILED.toString());
statusCodes.add(RegistrationStatusCode.PROCESSED.toString());

if(statusList != null && !statusList.isEmpty()) {
return registrationStatusRepositary.getUnProcessedPacketsWithSpecificStatus(processList, trnStatusList, reprocessCount, timeDifference,
statusList, fetchSize, excludeStageNames);
} else {
return registrationStatusRepositary.getUnProcessedPackets(processList, trnStatusList, reprocessCount, timeDifference,
statusCodes, fetchSize, excludeStageNames);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,5 +64,12 @@ public List<String> getProcessedOrProcessingRegIds(@Param("regIds") List<String>

@Query("SELECT registration FROM RegistrationStatusEntity registration WHERE registration.regId = :regId AND registration.registrationType = :registrationType AND registration.iteration = :iteration")
public List<RegistrationStatusEntity> getByIdAndProcessAndIteration(@Param("regId") String regId, @Param("registrationType") String process, @Param("iteration") int iteration);

@Query(value ="SELECT * FROM registration r WHERE r.process IN :processList AND r.latest_trn_status_code IN :status AND r.reg_process_retry_count<=:reprocessCount AND r.latest_trn_dtimes <:timeDifference AND r.status_code NOT IN :statusCodes AND r.reg_stage_name NOT IN :excludeStageNames order by r.latest_trn_dtimes LIMIT :fetchSize ", nativeQuery = true)
public List<RegistrationStatusEntity> getUnProcessedPackets(@Param("processList") List<String> processes, @Param("status") List<String> status,@Param("reprocessCount") Integer reprocessCount,@Param("timeDifference") LocalDateTime timeDifference,@Param("statusCodes") List<String> statusCodes,@Param("fetchSize") Integer fetchSize,@Param("excludeStageNames") List<String> excludeStageNames);

@Query(value ="SELECT * FROM registration r WHERE r.process IN :processList AND r.latest_trn_status_code IN :status AND r.reg_process_retry_count<=:reprocessCount AND r.latest_trn_dtimes <:timeDifference AND r.status_code IN :statusCodes AND r.reg_stage_name NOT IN :excludeStageNames order by r.latest_trn_dtimes LIMIT :fetchSize ", nativeQuery = true)
public List<RegistrationStatusEntity> getUnProcessedPacketsWithSpecificStatus(@Param("processList") List<String> processes, @Param("status") List<String> status,@Param("reprocessCount") Integer reprocessCount,@Param("timeDifference") LocalDateTime timeDifference,@Param("statusCodes") List<String> statusCodes,@Param("fetchSize") Integer fetchSize,@Param("excludeStageNames") List<String> excludeStageNames);

}

Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.mosip.registration.processor.status.service;

import java.util.List;
import java.util.concurrent.CompletableFuture;

import org.springframework.data.domain.Page;
import org.springframework.stereotype.Service;
Expand Down Expand Up @@ -164,4 +165,38 @@ public Integer getUnProcessedPacketsCount(long elapseTime, Integer reprocessCoun

public List<InternalRegistrationStatusDto> getResumablePackets(Integer fetchSize);

/**
* Gets the un processed packets.
*
* @param processList
* the process List
* @param fetchSize
* the fetch size
* @param elapseTime
* the elapse time
* @param reprocessCount
* the reprocess count
* @param trnStatusList
* the transaction status
* @param excludeStageNames
* the stage which need to exclude
* @param statusList
* the status
* @return the un processed packets
*/
public CompletableFuture<List<InternalRegistrationStatusDto>> getUnProcessedPackets(List<String> processList, Integer fetchSize, long elapseTime, Integer reprocessCount,
List<String> trnStatusList, List<String> excludeStageNames, List<String> statusList);

/**
* Update registration status for workflow Engine.
*
* @param registrationStatusDtos
* the list of registration status dto
* @param moduleId
* the module id
* @param moduleName
* the module name
*/
public void updateRegistrationStatusForWorkflowEngines(List<U> registrationStatusDtos, String moduleId, String moduleName);

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,16 @@ public interface TransactionService<U> {
*/
public TransactionEntity addRegistrationTransaction(U registrationStatusDto);

/**
* Adds the registration transaction.
*
* @param registrationStatusDtoList
* the registration status dto List
* @return the transaction entity
*/
public List<TransactionEntity> addRegistrationTransactions(List<U> registrationStatusDtoList);


/**
* Gets the transaction by reg id and status code.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,17 @@
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

import io.mosip.registration.processor.core.code.RegistrationTransactionTypeCode;
import org.apache.commons.collections.CollectionUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.dao.DataAccessException;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageImpl;
import org.springframework.data.domain.PageRequest;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;

import io.mosip.kernel.core.dataaccess.exception.DataAccessLayerException;
Expand Down Expand Up @@ -964,5 +965,123 @@ public void updateRegistrationStatusForWorkflow(InternalRegistrationStatusDto re
"RegistrationStatusServiceImpl::updateRegistrationStatusForWorkFlow()::exit");

}


/**
* Gets the un processed packets.
*
* @param processList
* the process List
* @param fetchSize
* the fetch size
* @param elapseTime
* the elapse time
* @param reprocessCount
* the reprocess count
* @param trnStatusList
* the transaction status
* @param excludeStageNames
* the stage which need to exclude
* @param statusList
* the status
* @return the un processed packets
*/
@Async
public CompletableFuture<List<InternalRegistrationStatusDto>> getUnProcessedPackets(List<String> processList, Integer fetchSize, long elapseTime,
Integer reprocessCount, List<String> trnStatusList, List<String> excludeStageNames, List<String> statusList) {

regProcLogger.debug(LoggerFileConstant.SESSIONID.toString(), LoggerFileConstant.USERID.toString(), "",
"RegistrationStatusServiceImpl::getReprocessPacket()::entry");
try {
List<RegistrationStatusEntity> entityList = registrationStatusDao.getUnProcessedPackets(processList, fetchSize,
elapseTime, reprocessCount, trnStatusList, excludeStageNames, statusList);

regProcLogger.debug(LoggerFileConstant.SESSIONID.toString(), LoggerFileConstant.USERID.toString(), "",
"RegistrationStatusServiceImpl::getReprocessPacket()::exit");

return CompletableFuture.completedFuture(convertEntityListToDtoList(entityList));
} catch (DataAccessException | DataAccessLayerException e) {

regProcLogger.error(LoggerFileConstant.SESSIONID.toString(), LoggerFileConstant.REGISTRATIONID.toString(),
"", e.getMessage() + ExceptionUtils.getStackTrace(e));
throw new TablenotAccessibleException(
PlatformErrorMessages.RPR_RGS_REGISTRATION_TABLE_NOT_ACCESSIBLE.getMessage(), e);
}
}

@Override
public void updateRegistrationStatusForWorkflowEngines(List<InternalRegistrationStatusDto> registrationStatusDtos, String moduleId,
String moduleName) {
List<RegistrationStatusEntity> registrationStatusEntities = new ArrayList<>();
List<TransactionDto> transactionDtoList = new ArrayList<>();

registrationStatusDtos.forEach(registrationStatusDto -> {
regProcLogger.debug(LoggerFileConstant.SESSIONID.toString(), LoggerFileConstant.USERID.toString(),
registrationStatusDto.getRegistrationId(),
"RegistrationStatusServiceImpl::updateRegistrationStatus()::entry");
boolean isTransactionSuccessful = false;
LogDescription description = new LogDescription();
String transactionId = generateId();
// String latestTransactionId = getLatestTransactionId(registrationStatusDto.getRegistrationId(),
// registrationStatusDto.getRegistrationType(), registrationStatusDto.getIteration(), registrationStatusDto.getWorkflowInstanceId());
TransactionDto transactionDto = new TransactionDto(transactionId, registrationStatusDto.getRegistrationId(),
registrationStatusDto.getLatestRegistrationTransactionId(), registrationStatusDto.getLatestTransactionTypeCode(),
"updated registration status record", registrationStatusDto.getLatestTransactionStatusCode(),
registrationStatusDto.getStatusComment(), registrationStatusDto.getSubStatusCode());
if (registrationStatusDto.getRefId() == null) {
transactionDto.setReferenceId(registrationStatusDto.getRegistrationId());
} else {
transactionDto.setReferenceId(registrationStatusDto.getRefId());
}

transactionDto.setReferenceIdType("updated registration record");
transactionDtoList.add(transactionDto);

registrationStatusDto.setLatestRegistrationTransactionId(transactionId);
try {
// InternalRegistrationStatusDto dto1 = getRegistrationStatus(registrationStatusDto.getRegistrationId(),
// registrationStatusDto.getRegistrationType(), registrationStatusDto.getIteration(), registrationStatusDto.getWorkflowInstanceId());
// if (dto != null) {
// dto.setUpdateDateTime(LocalDateTime.now(ZoneId.of("UTC")));
RegistrationStatusEntity entity = convertDtoToEntity(registrationStatusDto,
registrationStatusDto.getLastSuccessStageName(), true);
if (entity.getStatusCode() == null) {
entity.setStatusCode(registrationStatusDto.getStatusCode());
}
registrationStatusEntities.add(entity);
isTransactionSuccessful = true;
description.setMessage("Updated registration status successfully");
// }
} catch (DataAccessException | DataAccessLayerException e) {
description.setMessage("DataAccessLayerException while Updating registration status for registration Id"
+ registrationStatusDto.getRegistrationId() + "::" + e.getMessage());

regProcLogger.error(LoggerFileConstant.SESSIONID.toString(), LoggerFileConstant.REGISTRATIONID.toString(),
registrationStatusDto.getRegistrationId(), e.getMessage() + ExceptionUtils.getStackTrace(e));
throw new TablenotAccessibleException(
PlatformErrorMessages.RPR_RGS_REGISTRATION_TABLE_NOT_ACCESSIBLE.getMessage(), e);
} finally {
String eventId = isTransactionSuccessful ? EventId.RPR_407.toString() : EventId.RPR_405.toString();
String eventName = eventId.equalsIgnoreCase(EventId.RPR_407.toString()) ? EventName.UPDATE.toString()
: EventName.EXCEPTION.toString();
String eventType = eventId.equalsIgnoreCase(EventId.RPR_407.toString()) ? EventType.BUSINESS.toString()
: EventType.SYSTEM.toString();

if(!disableAudit)
auditLogRequestBuilder.createAuditRequestBuilder(description.getMessage(), eventId, eventName, eventType,
moduleId, moduleName, registrationStatusDto.getRegistrationId());

}
regProcLogger.debug(LoggerFileConstant.SESSIONID.toString(), LoggerFileConstant.USERID.toString(),
registrationStatusDto.getRegistrationId(),
"RegistrationStatusServiceImpl::updateRegistrationStatus()::exit");

});

if(!transactionDtoList.isEmpty())
transcationStatusService.addRegistrationTransactions(transactionDtoList);

if(!registrationStatusEntities.isEmpty())
registrationStatusDao.saveAll(registrationStatusEntities);
}

}
Loading