diff --git a/registration-processor/registration-processor-core/src/main/java/io/mosip/registration/processor/core/abstractverticle/MosipVerticleManager.java b/registration-processor/registration-processor-core/src/main/java/io/mosip/registration/processor/core/abstractverticle/MosipVerticleManager.java index 8763a5f2f2f..a58f4dd74b5 100644 --- a/registration-processor/registration-processor-core/src/main/java/io/mosip/registration/processor/core/abstractverticle/MosipVerticleManager.java +++ b/registration-processor/registration-processor-core/src/main/java/io/mosip/registration/processor/core/abstractverticle/MosipVerticleManager.java @@ -10,6 +10,7 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.exception.ExceptionUtils; import org.slf4j.MDC; @@ -146,7 +147,7 @@ public MosipEventBus getEventBus(Object verticleName, String clusterManagerUrl, VertxOptions options = new VertxOptions().setClustered(true).setClusterManager(clusterManager) .setHAEnabled(false).setWorkerPoolSize(instanceNumber) .setEventBusOptions(new EventBusOptions().setPort(getEventBusPort()).setHost(address)) - .setMetricsOptions(micrometerMetricsOptions); + .setMetricsOptions(micrometerMetricsOptions).setMaxEventLoopExecuteTime(getMaxEventLoopExecutionTime()).setMaxEventLoopExecuteTimeUnit(TimeUnit.SECONDS); Vertx.clusteredVertx(options, result -> { if (result.succeeded()) { result.result().deployVerticle((Verticle) verticleName, @@ -282,6 +283,10 @@ public Integer getEventBusPort() { return getIntegerPropertyForSuffix("eventbus.port"); } + public Integer getMaxEventLoopExecutionTime() { + return getIntegerPropertyForSuffix("eventbus.maxloop.exec.time.seconds", 2); + } + public Integer getPort() { return getIntegerPropertyForSuffix("server.port"); } @@ -290,6 +295,10 @@ protected Integer getIntegerPropertyForSuffix(String propSuffix) { return propertiesUtil.getIntegerProperty(getPropertyPrefix(), propSuffix); } + protected Integer getIntegerPropertyForSuffix(String propSuffix, Integer defaultValue) { + return propertiesUtil.getProperty(getPropertyPrefix() + propSuffix, Integer.class, defaultValue); + } + protected Boolean getBooleanPropertyForSuffix(String propSuffix, Boolean defaultValue) { return propertiesUtil.getProperty(getPropertyPrefix() + propSuffix, Boolean.class, defaultValue); } diff --git a/registration-processor/registration-processor-core/src/test/java/io/mosip/registration/processor/abstractverticle/ConsumerVerticle.java b/registration-processor/registration-processor-core/src/test/java/io/mosip/registration/processor/abstractverticle/ConsumerVerticle.java index 974cf840308..c62507c9497 100644 --- a/registration-processor/registration-processor-core/src/test/java/io/mosip/registration/processor/abstractverticle/ConsumerVerticle.java +++ b/registration-processor/registration-processor-core/src/test/java/io/mosip/registration/processor/abstractverticle/ConsumerVerticle.java @@ -4,14 +4,7 @@ import java.util.ArrayList; import brave.Tracing; -import io.mosip.registration.processor.core.tracing.EventTracingHandler; -import io.vertx.core.eventbus.EventBus; import io.vertx.core.logging.SLF4JLogDelegateFactory; -import org.assertj.core.util.Objects; -import org.junit.Assert; -import org.junit.Before; -import org.mockito.Mockito; -import org.springframework.boot.test.mock.mockito.MockBean; import io.mosip.registration.processor.core.abstractverticle.MessageBusAddress; import io.mosip.registration.processor.core.abstractverticle.MessageDTO; @@ -66,7 +59,7 @@ public URL findUrl() URL url=loader.getResource("cluster.xml"); return url; } - + @Override public Integer getEventBusPort() { return 5711; @@ -82,4 +75,8 @@ protected String getPropertyPrefix() { return EMPTY_STRING; } + @Override + public Integer getMaxEventLoopExecutionTime() { + return 3; + } } \ No newline at end of file diff --git a/registration-processor/registration-processor-core/src/test/java/io/mosip/registration/processor/core/eventbus/KafkaMosipEventBusTest.java b/registration-processor/registration-processor-core/src/test/java/io/mosip/registration/processor/core/eventbus/KafkaMosipEventBusTest.java index 8c35485c68c..2e799141fcf 100644 --- a/registration-processor/registration-processor-core/src/test/java/io/mosip/registration/processor/core/eventbus/KafkaMosipEventBusTest.java +++ b/registration-processor/registration-processor-core/src/test/java/io/mosip/registration/processor/core/eventbus/KafkaMosipEventBusTest.java @@ -621,7 +621,7 @@ private KafkaConsumerRecords prepareKafkaConsumerRecords(int rec consumerRecordList.add( new ConsumerRecord( MessageBusAddress.PACKET_VALIDATOR_BUS_IN.getAddress(), 0, i, "1000"+i, - "{\"rid\":\"1000"+i+"\", \"reg_type\": \"NEW\" }")); + "{\"rid\":\"1000"+i+"\", \"reg_type\": \"NEW\", \"messageBusAddress\" : { \"address\" : \"packet-uploader-new-bus-out\"}}")); Map>> topicPartitionConsumerRecordListMap = new HashMap>>(); topicPartitionConsumerRecordListMap.put( diff --git a/registration-processor/registration-processor-registration-status-service-impl/src/main/java/io/mosip/registration/processor/status/dao/RegistrationStatusDao.java b/registration-processor/registration-processor-registration-status-service-impl/src/main/java/io/mosip/registration/processor/status/dao/RegistrationStatusDao.java index e5810589190..89c26ec25b9 100644 --- a/registration-processor/registration-processor-registration-status-service-impl/src/main/java/io/mosip/registration/processor/status/dao/RegistrationStatusDao.java +++ b/registration-processor/registration-processor-registration-status-service-impl/src/main/java/io/mosip/registration/processor/status/dao/RegistrationStatusDao.java @@ -1,11 +1,7 @@ package io.mosip.registration.processor.status.dao; import java.time.LocalDateTime; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; +import java.util.*; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.domain.Page; @@ -83,6 +79,17 @@ public RegistrationStatusEntity save(RegistrationStatusEntity registrationStatus return registrationStatusRepositary.save(registrationStatusEntity); } + /** + * Save. + * + * @param registrationStatusEntities + * the registration status entity list + * @return the registration status entity + */ + public List saveAll(List registrationStatusEntities) { + return registrationStatusRepositary.saveAll(registrationStatusEntities); + } + /** * Update. * @@ -247,4 +254,45 @@ public List findByIdAndProcessAndIteration(String id, { return registrationStatusRepositary.getByIdAndProcessAndIteration(id, process, iteration); } + + /** + * Gets the un processed packets. + * + * + * @param processList + * the process List + * @param fetchSize + * the fetch size + * @param elapseTime + * the elapse time + * @param reprocessCount + * the reprocess count + * @param trnStatusList + * the transaction status + * @param excludeStageNames + * the stage which need to exclude + * @param statusList + * the status + * @return the un processed packets + */ + public List getUnProcessedPackets(List processList, Integer fetchSize, long elapseTime, + Integer reprocessCount, List trnStatusList, List excludeStageNames, List statusList) { + + LocalDateTime timeDifference = LocalDateTime.now().minusSeconds(elapseTime); + List statusCodes=new ArrayList<>(); + statusCodes.add(RegistrationStatusCode.PAUSED.toString()); + statusCodes.add(RegistrationStatusCode.RESUMABLE.toString()); + statusCodes.add(RegistrationStatusCode.PAUSED_FOR_ADDITIONAL_INFO.toString()); + statusCodes.add(RegistrationStatusCode.REJECTED.toString()); + statusCodes.add(RegistrationStatusCode.FAILED.toString()); + statusCodes.add(RegistrationStatusCode.PROCESSED.toString()); + + if(statusList != null && !statusList.isEmpty()) { + return registrationStatusRepositary.getUnProcessedPacketsWithSpecificStatus(processList, trnStatusList, reprocessCount, timeDifference, + statusList, fetchSize, excludeStageNames); + } else { + return registrationStatusRepositary.getUnProcessedPackets(processList, trnStatusList, reprocessCount, timeDifference, + statusCodes, fetchSize, excludeStageNames); + } + } } \ No newline at end of file diff --git a/registration-processor/registration-processor-registration-status-service-impl/src/main/java/io/mosip/registration/processor/status/repositary/RegistrationRepositary.java b/registration-processor/registration-processor-registration-status-service-impl/src/main/java/io/mosip/registration/processor/status/repositary/RegistrationRepositary.java index 5b9027a921e..08745b2e7ae 100644 --- a/registration-processor/registration-processor-registration-status-service-impl/src/main/java/io/mosip/registration/processor/status/repositary/RegistrationRepositary.java +++ b/registration-processor/registration-processor-registration-status-service-impl/src/main/java/io/mosip/registration/processor/status/repositary/RegistrationRepositary.java @@ -64,5 +64,12 @@ public List getProcessedOrProcessingRegIds(@Param("regIds") List @Query("SELECT registration FROM RegistrationStatusEntity registration WHERE registration.regId = :regId AND registration.registrationType = :registrationType AND registration.iteration = :iteration") public List getByIdAndProcessAndIteration(@Param("regId") String regId, @Param("registrationType") String process, @Param("iteration") int iteration); + + @Query(value ="SELECT * FROM registration r WHERE r.process IN :processList AND r.latest_trn_status_code IN :status AND r.reg_process_retry_count<=:reprocessCount AND r.latest_trn_dtimes <:timeDifference AND r.status_code NOT IN :statusCodes AND r.reg_stage_name NOT IN :excludeStageNames order by r.latest_trn_dtimes LIMIT :fetchSize ", nativeQuery = true) + public List getUnProcessedPackets(@Param("processList") List processes, @Param("status") List status,@Param("reprocessCount") Integer reprocessCount,@Param("timeDifference") LocalDateTime timeDifference,@Param("statusCodes") List statusCodes,@Param("fetchSize") Integer fetchSize,@Param("excludeStageNames") List excludeStageNames); + + @Query(value ="SELECT * FROM registration r WHERE r.process IN :processList AND r.latest_trn_status_code IN :status AND r.reg_process_retry_count<=:reprocessCount AND r.latest_trn_dtimes <:timeDifference AND r.status_code IN :statusCodes AND r.reg_stage_name NOT IN :excludeStageNames order by r.latest_trn_dtimes LIMIT :fetchSize ", nativeQuery = true) + public List getUnProcessedPacketsWithSpecificStatus(@Param("processList") List processes, @Param("status") List status,@Param("reprocessCount") Integer reprocessCount,@Param("timeDifference") LocalDateTime timeDifference,@Param("statusCodes") List statusCodes,@Param("fetchSize") Integer fetchSize,@Param("excludeStageNames") List excludeStageNames); + } diff --git a/registration-processor/registration-processor-registration-status-service-impl/src/main/java/io/mosip/registration/processor/status/service/RegistrationStatusService.java b/registration-processor/registration-processor-registration-status-service-impl/src/main/java/io/mosip/registration/processor/status/service/RegistrationStatusService.java index ac9e84d9b29..7fc19eb8678 100644 --- a/registration-processor/registration-processor-registration-status-service-impl/src/main/java/io/mosip/registration/processor/status/service/RegistrationStatusService.java +++ b/registration-processor/registration-processor-registration-status-service-impl/src/main/java/io/mosip/registration/processor/status/service/RegistrationStatusService.java @@ -1,6 +1,7 @@ package io.mosip.registration.processor.status.service; import java.util.List; +import java.util.concurrent.CompletableFuture; import org.springframework.data.domain.Page; import org.springframework.stereotype.Service; @@ -164,4 +165,38 @@ public Integer getUnProcessedPacketsCount(long elapseTime, Integer reprocessCoun public List getResumablePackets(Integer fetchSize); + /** + * Gets the un processed packets. + * + * @param processList + * the process List + * @param fetchSize + * the fetch size + * @param elapseTime + * the elapse time + * @param reprocessCount + * the reprocess count + * @param trnStatusList + * the transaction status + * @param excludeStageNames + * the stage which need to exclude + * @param statusList + * the status + * @return the un processed packets + */ + public CompletableFuture> getUnProcessedPackets(List processList, Integer fetchSize, long elapseTime, Integer reprocessCount, + List trnStatusList, List excludeStageNames, List statusList); + + /** + * Update registration status for workflow Engine. + * + * @param registrationStatusDtos + * the list of registration status dto + * @param moduleId + * the module id + * @param moduleName + * the module name + */ + public void updateRegistrationStatusForWorkflowEngines(List registrationStatusDtos, String moduleId, String moduleName); + } diff --git a/registration-processor/registration-processor-registration-status-service-impl/src/main/java/io/mosip/registration/processor/status/service/TransactionService.java b/registration-processor/registration-processor-registration-status-service-impl/src/main/java/io/mosip/registration/processor/status/service/TransactionService.java index 431a4002fa8..586a38daaec 100644 --- a/registration-processor/registration-processor-registration-status-service-impl/src/main/java/io/mosip/registration/processor/status/service/TransactionService.java +++ b/registration-processor/registration-processor-registration-status-service-impl/src/main/java/io/mosip/registration/processor/status/service/TransactionService.java @@ -27,6 +27,16 @@ public interface TransactionService { */ public TransactionEntity addRegistrationTransaction(U registrationStatusDto); + /** + * Adds the registration transaction. + * + * @param registrationStatusDtoList + * the registration status dto List + * @return the transaction entity + */ + public List addRegistrationTransactions(List registrationStatusDtoList); + + /** * Gets the transaction by reg id and status code. * diff --git a/registration-processor/registration-processor-registration-status-service-impl/src/main/java/io/mosip/registration/processor/status/service/impl/RegistrationStatusServiceImpl.java b/registration-processor/registration-processor-registration-status-service-impl/src/main/java/io/mosip/registration/processor/status/service/impl/RegistrationStatusServiceImpl.java index a6c3e1cc4cd..4e9bcc3eb97 100644 --- a/registration-processor/registration-processor-registration-status-service-impl/src/main/java/io/mosip/registration/processor/status/service/impl/RegistrationStatusServiceImpl.java +++ b/registration-processor/registration-processor-registration-status-service-impl/src/main/java/io/mosip/registration/processor/status/service/impl/RegistrationStatusServiceImpl.java @@ -7,9 +7,9 @@ import java.util.List; import java.util.Optional; import java.util.UUID; +import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; -import io.mosip.registration.processor.core.code.RegistrationTransactionTypeCode; import org.apache.commons.collections.CollectionUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; @@ -17,6 +17,7 @@ import org.springframework.data.domain.Page; import org.springframework.data.domain.PageImpl; import org.springframework.data.domain.PageRequest; +import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Component; import io.mosip.kernel.core.dataaccess.exception.DataAccessLayerException; @@ -964,5 +965,123 @@ public void updateRegistrationStatusForWorkflow(InternalRegistrationStatusDto re "RegistrationStatusServiceImpl::updateRegistrationStatusForWorkFlow()::exit"); } - + + /** + * Gets the un processed packets. + * + * @param processList + * the process List + * @param fetchSize + * the fetch size + * @param elapseTime + * the elapse time + * @param reprocessCount + * the reprocess count + * @param trnStatusList + * the transaction status + * @param excludeStageNames + * the stage which need to exclude + * @param statusList + * the status + * @return the un processed packets + */ + @Async + public CompletableFuture> getUnProcessedPackets(List processList, Integer fetchSize, long elapseTime, + Integer reprocessCount, List trnStatusList, List excludeStageNames, List statusList) { + + regProcLogger.debug(LoggerFileConstant.SESSIONID.toString(), LoggerFileConstant.USERID.toString(), "", + "RegistrationStatusServiceImpl::getReprocessPacket()::entry"); + try { + List entityList = registrationStatusDao.getUnProcessedPackets(processList, fetchSize, + elapseTime, reprocessCount, trnStatusList, excludeStageNames, statusList); + + regProcLogger.debug(LoggerFileConstant.SESSIONID.toString(), LoggerFileConstant.USERID.toString(), "", + "RegistrationStatusServiceImpl::getReprocessPacket()::exit"); + + return CompletableFuture.completedFuture(convertEntityListToDtoList(entityList)); + } catch (DataAccessException | DataAccessLayerException e) { + + regProcLogger.error(LoggerFileConstant.SESSIONID.toString(), LoggerFileConstant.REGISTRATIONID.toString(), + "", e.getMessage() + ExceptionUtils.getStackTrace(e)); + throw new TablenotAccessibleException( + PlatformErrorMessages.RPR_RGS_REGISTRATION_TABLE_NOT_ACCESSIBLE.getMessage(), e); + } + } + + @Override + public void updateRegistrationStatusForWorkflowEngines(List registrationStatusDtos, String moduleId, + String moduleName) { + List registrationStatusEntities = new ArrayList<>(); + List transactionDtoList = new ArrayList<>(); + + registrationStatusDtos.forEach(registrationStatusDto -> { + regProcLogger.debug(LoggerFileConstant.SESSIONID.toString(), LoggerFileConstant.USERID.toString(), + registrationStatusDto.getRegistrationId(), + "RegistrationStatusServiceImpl::updateRegistrationStatus()::entry"); + boolean isTransactionSuccessful = false; + LogDescription description = new LogDescription(); + String transactionId = generateId(); +// String latestTransactionId = getLatestTransactionId(registrationStatusDto.getRegistrationId(), +// registrationStatusDto.getRegistrationType(), registrationStatusDto.getIteration(), registrationStatusDto.getWorkflowInstanceId()); + TransactionDto transactionDto = new TransactionDto(transactionId, registrationStatusDto.getRegistrationId(), + registrationStatusDto.getLatestRegistrationTransactionId(), registrationStatusDto.getLatestTransactionTypeCode(), + "updated registration status record", registrationStatusDto.getLatestTransactionStatusCode(), + registrationStatusDto.getStatusComment(), registrationStatusDto.getSubStatusCode()); + if (registrationStatusDto.getRefId() == null) { + transactionDto.setReferenceId(registrationStatusDto.getRegistrationId()); + } else { + transactionDto.setReferenceId(registrationStatusDto.getRefId()); + } + + transactionDto.setReferenceIdType("updated registration record"); + transactionDtoList.add(transactionDto); + + registrationStatusDto.setLatestRegistrationTransactionId(transactionId); + try { + // InternalRegistrationStatusDto dto1 = getRegistrationStatus(registrationStatusDto.getRegistrationId(), + // registrationStatusDto.getRegistrationType(), registrationStatusDto.getIteration(), registrationStatusDto.getWorkflowInstanceId()); + // if (dto != null) { + // dto.setUpdateDateTime(LocalDateTime.now(ZoneId.of("UTC"))); + RegistrationStatusEntity entity = convertDtoToEntity(registrationStatusDto, + registrationStatusDto.getLastSuccessStageName(), true); + if (entity.getStatusCode() == null) { + entity.setStatusCode(registrationStatusDto.getStatusCode()); + } + registrationStatusEntities.add(entity); + isTransactionSuccessful = true; + description.setMessage("Updated registration status successfully"); + // } + } catch (DataAccessException | DataAccessLayerException e) { + description.setMessage("DataAccessLayerException while Updating registration status for registration Id" + + registrationStatusDto.getRegistrationId() + "::" + e.getMessage()); + + regProcLogger.error(LoggerFileConstant.SESSIONID.toString(), LoggerFileConstant.REGISTRATIONID.toString(), + registrationStatusDto.getRegistrationId(), e.getMessage() + ExceptionUtils.getStackTrace(e)); + throw new TablenotAccessibleException( + PlatformErrorMessages.RPR_RGS_REGISTRATION_TABLE_NOT_ACCESSIBLE.getMessage(), e); + } finally { + String eventId = isTransactionSuccessful ? EventId.RPR_407.toString() : EventId.RPR_405.toString(); + String eventName = eventId.equalsIgnoreCase(EventId.RPR_407.toString()) ? EventName.UPDATE.toString() + : EventName.EXCEPTION.toString(); + String eventType = eventId.equalsIgnoreCase(EventId.RPR_407.toString()) ? EventType.BUSINESS.toString() + : EventType.SYSTEM.toString(); + + if(!disableAudit) + auditLogRequestBuilder.createAuditRequestBuilder(description.getMessage(), eventId, eventName, eventType, + moduleId, moduleName, registrationStatusDto.getRegistrationId()); + + } + regProcLogger.debug(LoggerFileConstant.SESSIONID.toString(), LoggerFileConstant.USERID.toString(), + registrationStatusDto.getRegistrationId(), + "RegistrationStatusServiceImpl::updateRegistrationStatus()::exit"); + + }); + + if(!transactionDtoList.isEmpty()) + transcationStatusService.addRegistrationTransactions(transactionDtoList); + + if(!registrationStatusEntities.isEmpty()) + registrationStatusDao.saveAll(registrationStatusEntities); + } + } diff --git a/registration-processor/registration-processor-registration-status-service-impl/src/main/java/io/mosip/registration/processor/status/service/impl/TransactionServiceImpl.java b/registration-processor/registration-processor-registration-status-service-impl/src/main/java/io/mosip/registration/processor/status/service/impl/TransactionServiceImpl.java index 5cb736f3631..1e9b7bded57 100644 --- a/registration-processor/registration-processor-registration-status-service-impl/src/main/java/io/mosip/registration/processor/status/service/impl/TransactionServiceImpl.java +++ b/registration-processor/registration-processor-registration-status-service-impl/src/main/java/io/mosip/registration/processor/status/service/impl/TransactionServiceImpl.java @@ -148,4 +148,32 @@ private RegistrationTransactionDto convertEntityToRegistrationTransactionDto(Tra entity.getParentid(), entity.getStatusCode(), entity.getSubStatusCode(), entity.getStatusComment(), entity.getCreateDateTime()); } + + /* + * (non-Javadoc) + * + * @see io.mosip.registration.processor.status.service.TransactionService# + * addRegistrationTransaction(java.lang.Object) + */ + @Override + public List addRegistrationTransactions(List transactionStatusDtoList) { + List entities = new ArrayList<>(); + transactionStatusDtoList.forEach(transactionStatusDto -> { + try { + regProcLogger.debug(LoggerFileConstant.SESSIONID.toString(), LoggerFileConstant.USERID.toString(), + transactionStatusDto.getRegistrationId(), + "TransactionServiceImpl::addRegistrationTransaction()::entry"); + TransactionEntity entity = convertDtoToEntity(transactionStatusDto); + entities.add(entity); + regProcLogger.debug(LoggerFileConstant.SESSIONID.toString(), LoggerFileConstant.USERID.toString(), + transactionStatusDto.getRegistrationId(), + "TransactionServiceImpl::addRegistrationTransaction()::exit"); + } catch (DataAccessLayerException e) { + throw new TransactionTableNotAccessibleException( + PlatformErrorMessages.RPR_RGS_TRANSACTION_TABLE_NOT_ACCESSIBLE.getMessage(), e); + } + }); + + return transactionRepositary.saveAll(entities); + } } diff --git a/registration-processor/workflow-engine/registration-processor-reprocessor/src/main/java/io/mosip/registration/processor/reprocessor/config/ReprocessorConfigBeans.java b/registration-processor/workflow-engine/registration-processor-reprocessor/src/main/java/io/mosip/registration/processor/reprocessor/config/ReprocessorConfigBeans.java index b7794a31d84..cb46bffbd0b 100644 --- a/registration-processor/workflow-engine/registration-processor-reprocessor/src/main/java/io/mosip/registration/processor/reprocessor/config/ReprocessorConfigBeans.java +++ b/registration-processor/workflow-engine/registration-processor-reprocessor/src/main/java/io/mosip/registration/processor/reprocessor/config/ReprocessorConfigBeans.java @@ -8,6 +8,10 @@ import io.mosip.registration.processor.packet.storage.utils.PacketManagerService; import io.mosip.registration.processor.reprocessor.verticle.ReprocessorVerticle; import io.mosip.registration.processor.rest.client.service.impl.RegistrationProcessorRestClientServiceImpl; +import org.springframework.scheduling.annotation.EnableAsync; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; + +import java.util.concurrent.Executor; /** * Config class to get configurations and beans for Reprocessor Verticle @@ -18,6 +22,7 @@ */ @PropertySource("classpath:bootstrap.properties") @Configuration +@EnableAsync public class ReprocessorConfigBeans { @Bean @@ -34,4 +39,16 @@ public RegistrationProcessorRestClientService getRegistrationProcessorRe public PacketManagerService getPacketManagerService() { return new PacketManagerService(); } + + // 🔹 Add this bean + @Bean(name = "taskExecutor") + public Executor taskExecutor() { + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + executor.setCorePoolSize(10); + executor.setMaxPoolSize(20); + executor.setQueueCapacity(100); + executor.setThreadNamePrefix("Reprocessor-Async-"); + executor.initialize(); + return executor; + } } diff --git a/registration-processor/workflow-engine/registration-processor-reprocessor/src/main/java/io/mosip/registration/processor/reprocessor/dto/ProcessAllocation.java b/registration-processor/workflow-engine/registration-processor-reprocessor/src/main/java/io/mosip/registration/processor/reprocessor/dto/ProcessAllocation.java new file mode 100644 index 00000000000..64e0e21755d --- /dev/null +++ b/registration-processor/workflow-engine/registration-processor-reprocessor/src/main/java/io/mosip/registration/processor/reprocessor/dto/ProcessAllocation.java @@ -0,0 +1,26 @@ +package io.mosip.registration.processor.reprocessor.dto; + +import lombok.Data; +import lombok.Getter; +import lombok.Setter; + +import java.util.List; + +@Data +@Setter +@Getter +public class ProcessAllocation { + private List processes; + private List statuses; + private int percentageAllocation; + + @Override + public String toString() { + return "ProcessAllocation{" + + "processes=" + processes + + ", statuses=" + statuses + + ", percentageAllocation=" + percentageAllocation + + '}'; + } +} + diff --git a/registration-processor/workflow-engine/registration-processor-reprocessor/src/main/java/io/mosip/registration/processor/reprocessor/verticle/ReprocessorVerticle.java b/registration-processor/workflow-engine/registration-processor-reprocessor/src/main/java/io/mosip/registration/processor/reprocessor/verticle/ReprocessorVerticle.java index bebd730a69a..6b9845adc76 100644 --- a/registration-processor/workflow-engine/registration-processor-reprocessor/src/main/java/io/mosip/registration/processor/reprocessor/verticle/ReprocessorVerticle.java +++ b/registration-processor/workflow-engine/registration-processor-reprocessor/src/main/java/io/mosip/registration/processor/reprocessor/verticle/ReprocessorVerticle.java @@ -1,12 +1,13 @@ package io.mosip.registration.processor.reprocessor.verticle; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; - +import java.util.*; +import java.util.concurrent.*; +import java.util.stream.Collectors; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.mosip.registration.processor.reprocessor.dto.ProcessAllocation; import org.apache.commons.lang3.exception.ExceptionUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; @@ -22,10 +23,10 @@ import io.mosip.registration.processor.core.abstractverticle.MosipVerticleAPIManager; import io.mosip.registration.processor.core.code.EventId; import io.mosip.registration.processor.core.code.EventName; +import io.mosip.registration.processor.core.code.RegistrationTransactionTypeCode; import io.mosip.registration.processor.core.code.EventType; import io.mosip.registration.processor.core.code.ModuleName; import io.mosip.registration.processor.core.code.RegistrationTransactionStatusCode; -import io.mosip.registration.processor.core.code.RegistrationTransactionTypeCode; import io.mosip.registration.processor.core.constant.LoggerFileConstant; import io.mosip.registration.processor.core.exception.util.PlatformErrorMessages; import io.mosip.registration.processor.core.exception.util.PlatformSuccessMessages; @@ -45,6 +46,8 @@ import io.vertx.core.eventbus.EventBus; import io.vertx.core.json.JsonObject; +import javax.annotation.PostConstruct; + /** * The Reprocessor Verticle to deploy the scheduler and implement re-processing * logic @@ -96,8 +99,24 @@ public class ReprocessorVerticle extends MosipVerticleAPIManager { @Value("#{'${registration.processor.reprocess.restart-trigger-filter}'.split(',')}") private List reprocessRestartTriggerFilter; + @Value("${registration.processor.reprocess.process-based.cache-enabled:false}") + private Boolean enabledProcessBasedCache; + + @Value("${registration.processor.reprocess.process-based.fetch-count-map}") + private String processBasedFetchCountMapJson; + + private List processBasedFetchCountMap; + + @Value("${registration.processor.reprocess.process-based.prefetch-limit:500}") + private int processBasedPrefetchLimit; + + @Value("${registration.processor.reprocess.process-based.threshold:50}") + private int processBasedThreashold; + + private ConcurrentHashMap> packetCacheMap = new ConcurrentHashMap<>(); + /** The is transaction successful. */ - boolean isTransactionSuccessful; + private boolean isTransactionSuccessful = false; /** The registration status service. */ @Autowired @@ -115,13 +134,19 @@ public class ReprocessorVerticle extends MosipVerticleAPIManager { @Value("${server.port}") private String port; + @PostConstruct + public void init() throws JsonProcessingException { + ObjectMapper mapper = new ObjectMapper(); + processBasedFetchCountMap = + mapper.readValue(processBasedFetchCountMapJson, new TypeReference>() {}); + } + /** * Deploy verticle. */ public void deployVerticle() { mosipEventBus = this.getEventBus(this, clusterManagerUrl); deployScheduler(getVertx()); - } /** @@ -220,31 +245,64 @@ public void start() { public MessageDTO process(MessageDTO object) { List reprocessorDtoList = null; LogDescription description = new LogDescription(); - List statusList = new ArrayList<>(); - statusList.add(RegistrationTransactionStatusCode.SUCCESS.toString()); - statusList.add(RegistrationTransactionStatusCode.REPROCESS.toString()); - statusList.add(RegistrationTransactionStatusCode.IN_PROGRESS.toString()); + List trnStatusList = new ArrayList<>(); + trnStatusList.add(RegistrationTransactionStatusCode.SUCCESS.toString()); + trnStatusList.add(RegistrationTransactionStatusCode.REPROCESS.toString()); + trnStatusList.add(RegistrationTransactionStatusCode.IN_PROGRESS.toString()); regProcLogger.debug(LoggerFileConstant.SESSIONID.toString(), LoggerFileConstant.REGISTRATIONID.toString(), "", "ReprocessorVerticle::process()::entry"); - StringBuffer ridSb=new StringBuffer(); + StringBuffer ridSb = new StringBuffer(); try { Map> reprocessRestartTriggerMap = intializeReprocessRestartTriggerMapping(); reprocessorDtoList = registrationStatusService.getResumablePackets(fetchSize); - if (!CollectionUtils.isEmpty(reprocessorDtoList)) { - if (reprocessorDtoList.size() < fetchSize) { - List reprocessorPacketList = registrationStatusService.getUnProcessedPackets(fetchSize - reprocessorDtoList.size(), elapseTime, - reprocessCount, statusList, reprocessExcludeStageNames); - if (!CollectionUtils.isEmpty(reprocessorPacketList)) { - reprocessorDtoList.addAll(reprocessorPacketList); - } + regProcLogger.debug(LoggerFileConstant.SESSIONID.toString(), LoggerFileConstant.REGISTRATIONID.toString(), "", + "Resumable Packets Count " + reprocessorDtoList.size() ); + + if(enabledProcessBasedCache) { + regProcLogger.debug(LoggerFileConstant.SESSIONID.toString(), LoggerFileConstant.REGISTRATIONID.toString(), "", + "Enabled Process based fetch"); + if(reprocessorDtoList.size() < fetchSize) { + LinkedHashMap requiredCountMap = prepareRequiredCount( + fetchSize - reprocessorDtoList.size() + ); + regProcLogger.debug(LoggerFileConstant.SESSIONID.toString(), LoggerFileConstant.REGISTRATIONID.toString(), "", + "Prepared process based required count details for fetch " + requiredCountMap.toString()); + cachePacketsIfBelowThreshold(requiredCountMap, trnStatusList); + regProcLogger.debug( + "Caching Packets if below Threshold method completed. Packet sizes: {}", + formatPacketCacheMap() + ); + + reprocessorDtoList.addAll(fetchUnprocessedPacketFromCache( + requiredCountMap, + fetchSize - reprocessorDtoList.size() + )); + } } else { - reprocessorDtoList = registrationStatusService.getUnProcessedPackets(fetchSize, elapseTime, - reprocessCount, statusList, reprocessExcludeStageNames); + if (!CollectionUtils.isEmpty(reprocessorDtoList)) { + if (reprocessorDtoList.size() < fetchSize) { + List reprocessorPacketList = registrationStatusService.getUnProcessedPackets(fetchSize - reprocessorDtoList.size(), elapseTime, + reprocessCount, trnStatusList, reprocessExcludeStageNames); + if (!CollectionUtils.isEmpty(reprocessorPacketList)) { + reprocessorDtoList.addAll(reprocessorPacketList); + } + } + } else { + reprocessorDtoList = registrationStatusService.getUnProcessedPackets(fetchSize, elapseTime, + reprocessCount, trnStatusList, reprocessExcludeStageNames); + } } - + regProcLogger.debug(LoggerFileConstant.SESSIONID.toString(), LoggerFileConstant.REGISTRATIONID.toString(), "", + "Reprocessor Total Packets Fetched " + reprocessorDtoList.size()); + if (!CollectionUtils.isEmpty(reprocessorDtoList)) { + List processedList = new ArrayList<>(); + /** Module-Id can be Both Success/Error code */ + String moduleId = PlatformSuccessMessages.RPR_SENT_TO_REPROCESS_SUCCESS.getCode(); + String moduleName = ModuleName.RE_PROCESSOR.toString(); + reprocessorDtoList.forEach(dto -> { String registrationId = dto.getRegistrationId(); ridSb.append(registrationId); @@ -274,48 +332,46 @@ public MessageDTO process(MessageDTO object) { if (isRestartFromStageRequired(dto, reprocessRestartTriggerMap)) { stageName = MessageBusUtil.getMessageBusAdress(reprocessRestartFromStage); stageName = stageName.concat(ReprocessorConstants.BUS_IN); - sendAndSetStatus(dto, messageDTO, stageName); - dto.setStatusComment(StatusUtil.RE_PROCESS_RESTART_FROM_STAGE.getMessage()); - dto.setSubStatusCode(StatusUtil.RE_PROCESS_RESTART_FROM_STAGE.getCode()); - description - .setMessage( - PlatformSuccessMessages.RPR_SENT_TO_REPROCESS_RESTART_FROM_STAGE_SUCCESS - .getMessage()); - description.setCode( - PlatformSuccessMessages.RPR_SENT_TO_REPROCESS_RESTART_FROM_STAGE_SUCCESS - .getCode()); + sendAndSetStatus(dto, messageDTO, stageName); + dto.setStatusComment(StatusUtil.RE_PROCESS_RESTART_FROM_STAGE.getMessage()); + dto.setSubStatusCode(StatusUtil.RE_PROCESS_RESTART_FROM_STAGE.getCode()); + description + .setMessage( + PlatformSuccessMessages.RPR_SENT_TO_REPROCESS_RESTART_FROM_STAGE_SUCCESS + .getMessage()); + description.setCode( + PlatformSuccessMessages.RPR_SENT_TO_REPROCESS_RESTART_FROM_STAGE_SUCCESS + .getCode()); } else { stageName = MessageBusUtil.getMessageBusAdress(dto.getRegistrationStageName()); - if (RegistrationTransactionStatusCode.SUCCESS.name() - .equalsIgnoreCase(dto.getLatestTransactionStatusCode())) { - stageName = stageName.concat(ReprocessorConstants.BUS_OUT); - } else { - stageName = stageName.concat(ReprocessorConstants.BUS_IN); - } + if (RegistrationTransactionStatusCode.SUCCESS.name() + .equalsIgnoreCase(dto.getLatestTransactionStatusCode())) { + stageName = stageName.concat(ReprocessorConstants.BUS_OUT); + } else { + stageName = stageName.concat(ReprocessorConstants.BUS_IN); + } sendAndSetStatus(dto, messageDTO, stageName); - dto.setStatusComment(StatusUtil.RE_PROCESS_COMPLETED.getMessage()); - dto.setSubStatusCode(StatusUtil.RE_PROCESS_COMPLETED.getCode()); - description.setMessage(PlatformSuccessMessages.RPR_SENT_TO_REPROCESS_SUCCESS.getMessage()); - description.setCode(PlatformSuccessMessages.RPR_SENT_TO_REPROCESS_SUCCESS.getCode()); + dto.setStatusComment(StatusUtil.RE_PROCESS_COMPLETED.getMessage()); + dto.setSubStatusCode(StatusUtil.RE_PROCESS_COMPLETED.getCode()); + description.setMessage(PlatformSuccessMessages.RPR_SENT_TO_REPROCESS_SUCCESS.getMessage()); + description.setCode(PlatformSuccessMessages.RPR_SENT_TO_REPROCESS_SUCCESS.getCode()); } } regProcLogger.info(LoggerFileConstant.SESSIONID.toString(), LoggerFileConstant.REGISTRATIONID.toString(), registrationId, description.getMessage()); - /** Module-Id can be Both Success/Error code */ - String moduleId = PlatformSuccessMessages.RPR_SENT_TO_REPROCESS_SUCCESS.getCode(); - String moduleName = ModuleName.RE_PROCESSOR.toString(); - registrationStatusService.updateRegistrationStatusForWorkflowEngine(dto, moduleId, moduleName); String eventId = EventId.RPR_402.toString(); String eventName = EventName.UPDATE.toString(); String eventType = EventType.BUSINESS.toString(); + processedList.add(dto); if (!isTransactionSuccessful) auditLogRequestBuilder.createAuditRequestBuilder(description.getMessage(), eventId, eventName, eventType, moduleId, moduleName, registrationId); }); - + + registrationStatusService.updateRegistrationStatusForWorkflowEngines(processedList, moduleId, moduleName); } } catch (TablenotAccessibleException e) { isTransactionSuccessful = false; @@ -326,7 +382,7 @@ public MessageDTO process(MessageDTO object) { description.getCode() + " -- ", PlatformErrorMessages.RPR_RGS_REGISTRATION_TABLE_NOT_ACCESSIBLE.getMessage(), e.toString()); - }catch (Exception ex) { + } catch (Exception ex) { isTransactionSuccessful = false; description.setMessage(PlatformErrorMessages.REPROCESSOR_VERTICLE_FAILED.getMessage()); description.setCode(PlatformErrorMessages.REPROCESSOR_VERTICLE_FAILED.getCode()); @@ -351,7 +407,7 @@ public MessageDTO process(MessageDTO object) { : description.getCode(); String moduleName = ModuleName.RE_PROCESSOR.toString(); auditLogRequestBuilder.createAuditRequestBuilder(description.getMessage(), eventId, eventName, eventType, - moduleId, moduleName, (ridSb.toString().length()>1?ridSb.substring(0,ridSb.length()-1):"")); + moduleId, moduleName, ridSb.toString()); } return object; @@ -422,4 +478,148 @@ private void sendAndSetStatus(InternalRegistrationStatusDto dto, MessageDTO mess protected String getPropertyPrefix() { return VERTICLE_PROPERTY_PREFIX; } + + private LinkedHashMap prepareRequiredCount(int availableCount) { + LinkedHashMap requiredCountMap = new LinkedHashMap<>(); + int totalMapCount = processBasedFetchCountMap.size(); + int index = 1; + int allocated = 0; + + for(ProcessAllocation processAllocation : processBasedFetchCountMap) { + int requiredCount; + + if(index == totalMapCount) { + // Last entry → allocate the remaining + requiredCount = fetchSize - allocated; + } else { + // Percentage-based allocation + requiredCount = (availableCount * processAllocation.getPercentageAllocation()) /100; + } + + // Ensure at least 1 + requiredCount = Math.max(requiredCount, 1); + + // Prevent overshooting the total + if(allocated + requiredCount > availableCount) { + requiredCount = availableCount - allocated; + } + + requiredCountMap.put(processAllocation, requiredCount); + allocated += requiredCount; + index++; + } + return requiredCountMap; + } + + private void cachePacketsIfBelowThreshold(LinkedHashMap requiredCountMap, List trnStatusList) { + List> futures = requiredCountMap.entrySet().stream() + .filter(entry -> { + String key = getKeyFromProcessAllocation(entry.getKey()); + ConcurrentLinkedQueue cachedPackets = packetCacheMap.get(key); + return cachedPackets == null || cachedPackets.size() < processBasedThreashold; + }) + .map(entry -> { + ProcessAllocation processAllocation = entry.getKey(); + String key = getKeyFromProcessAllocation(processAllocation); + regProcLogger.debug(LoggerFileConstant.SESSIONID.toString(), LoggerFileConstant.REGISTRATIONID.toString(), "", + "Fetch Records from Database for Process " + key); + + List processList = processAllocation.getProcesses(); + List statusValList = processAllocation.getStatuses(); + regProcLogger.debug(LoggerFileConstant.SESSIONID.toString(), LoggerFileConstant.REGISTRATIONID.toString(), "", + "Status used to fetch Records Process " + key + " is " + statusValList); + + ConcurrentLinkedQueue cacheList = packetCacheMap.get(key); + + // Fetch unprocessed packets + List skipRegIdList = new ArrayList<>(cacheList != null && !cacheList.isEmpty() ? cacheList.stream().map(e -> e.getRegistrationId()).collect(Collectors.toList()) : Collections.emptyList()); + + return registrationStatusService.getUnProcessedPackets(processList, processBasedPrefetchLimit, elapseTime, + reprocessCount, trnStatusList, reprocessExcludeStageNames, statusValList) + .thenAccept(result -> { + regProcLogger.debug(LoggerFileConstant.SESSIONID.toString(), LoggerFileConstant.REGISTRATIONID.toString(), "", + "Total Record Fetched from database for process " + key + " is " + result.size()); + + List filteredList; + if(!skipRegIdList.isEmpty()) + filteredList = result.stream().filter(obj -> !skipRegIdList.contains(obj.getRegistrationId())).collect(Collectors.toList()); + else + filteredList = result; + + // Thread-safe update to cache + packetCacheMap.compute(key, (k, existingList) -> { + if (existingList == null) return new ConcurrentLinkedQueue<>(filteredList); + existingList.addAll(filteredList); + return existingList; + }); + }) + .exceptionally(ex -> { + regProcLogger.error("Error Triggered for Process [{}] and Status [{}]", processList, statusValList, ex); + return null; + }); + }) + .collect(Collectors.toList()); + + CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join(); + } + + private String getKeyFromProcessAllocation(ProcessAllocation processAllocation) { + return String.join(",", processAllocation.getProcesses()) + "#" + String.join(",", processAllocation.getStatuses()); + } + + private List fetchUnprocessedPacketFromCache(LinkedHashMap requiredCountMap, int fetchCount) { + int previousBalanceCount = 0; + int emptyCacheCount = 0; + List reprocessorPacketList = new ArrayList<>(); + + for(Map.Entry entry : requiredCountMap.entrySet()) { + String key = getKeyFromProcessAllocation(entry.getKey()); + int remainingToFetch = fetchCount - reprocessorPacketList.size(); + if(remainingToFetch <= 0) + break; + + int requiredCount = entry.getValue() + previousBalanceCount; + ConcurrentLinkedQueue cachedPackets = packetCacheMap.getOrDefault(key, new ConcurrentLinkedQueue<>()); + + if(!cachedPackets.isEmpty()) { + int count = Math.min(requiredCount, remainingToFetch); + regProcLogger.debug(LoggerFileConstant.SESSIONID.toString(), LoggerFileConstant.REGISTRATIONID.toString(), "", + key + " Packets Required Count " + count); + List fetchedPackets = fetchFromCache(cachedPackets, count); + regProcLogger.debug(LoggerFileConstant.SESSIONID.toString(), LoggerFileConstant.REGISTRATIONID.toString(), "", + key + "Total Packets Fetched from Cache " + fetchedPackets.size()); + reprocessorPacketList.addAll(fetchedPackets); + previousBalanceCount = Math.max(0, requiredCount-fetchedPackets.size()); + } else { + emptyCacheCount++; + previousBalanceCount = requiredCount; + } + regProcLogger.debug(LoggerFileConstant.SESSIONID.toString(), LoggerFileConstant.REGISTRATIONID.toString(), "", + key + "Count to be moved to next process " + previousBalanceCount); + } + + if((reprocessorPacketList.size() < fetchCount) && (requiredCountMap.size() != emptyCacheCount)) + reprocessorPacketList.addAll(fetchUnprocessedPacketFromCache(requiredCountMap, fetchCount-reprocessorPacketList.size())); + + return reprocessorPacketList; + } + + private List fetchFromCache(ConcurrentLinkedQueue cache, int count) { + int actualFetchCount = Math.min(count, cache.size()); + List fetched = new ArrayList<>(actualFetchCount); + for (int i = 0; i < actualFetchCount; i++) { + InternalRegistrationStatusDto dto = cache.poll(); + if (dto == null) break; + fetched.add(dto); + } + return fetched; + } + + private String formatPacketCacheMap() { + return packetCacheMap.entrySet() + .stream() + .map(e -> e.getKey() + " = " + e.getValue().size()) + .collect(Collectors.joining(", ", "[", "]")); + } + } diff --git a/registration-processor/workflow-engine/registration-processor-reprocessor/src/main/resources/bootstrap.properties b/registration-processor/workflow-engine/registration-processor-reprocessor/src/main/resources/bootstrap.properties index 1403ff403b7..38471d6e54b 100644 --- a/registration-processor/workflow-engine/registration-processor-reprocessor/src/main/resources/bootstrap.properties +++ b/registration-processor/workflow-engine/registration-processor-reprocessor/src/main/resources/bootstrap.properties @@ -17,4 +17,10 @@ registration.processor.identityjson=RegistrationProcessorIdentity.json registration.processor.demographic.identity=identity packet.info.storage.service=registration-processor-packet-info-storage-service config.server.file.storage.uri=${spring.cloud.config.uri}/${packet.info.storage.service}/${spring.profiles.active}/${spring.cloud.config.label}/ -mosip.regproc.registration.status.service.disable-audit=true \ No newline at end of file +mosip.regproc.registration.status.service.disable-audit=true +spring.jpa.properties.hibernate.jdbc.batch_size=50 +spring.jpa.properties.hibernate.order_updates=true +spring.jpa.properties.hibernate.order_inserts=true +spring.jpa.properties.hibernate.batch_versioned_data=true +logging.level.org.hibernate.SQL=DEBUG +logging.level.org.hibernate.type.descriptor.sql=TRACE \ No newline at end of file diff --git a/registration-processor/workflow-engine/registration-processor-reprocessor/src/test/java/io/mosip/registration/processor/reprocessor/verticle/ReprocessorVerticleTest.java b/registration-processor/workflow-engine/registration-processor-reprocessor/src/test/java/io/mosip/registration/processor/reprocessor/verticle/ReprocessorVerticleTest.java index b028d511454..6a352465d1d 100644 --- a/registration-processor/workflow-engine/registration-processor-reprocessor/src/test/java/io/mosip/registration/processor/reprocessor/verticle/ReprocessorVerticleTest.java +++ b/registration-processor/workflow-engine/registration-processor-reprocessor/src/test/java/io/mosip/registration/processor/reprocessor/verticle/ReprocessorVerticleTest.java @@ -7,9 +7,10 @@ import java.lang.reflect.Field; import java.time.LocalDateTime; -import java.util.ArrayList; -import java.util.List; +import java.util.*; +import java.util.concurrent.CompletableFuture; +import io.mosip.registration.processor.reprocessor.dto.ProcessAllocation; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -117,10 +118,21 @@ public void setup() throws Exception { //Mockito.doNothing().when(description).setMessage(Mockito.anyString()); //Mockito.when(description.getCode()).thenReturn("CODE"); //Mockito.when(description.getMessage()).thenReturn("MESSAGE"); - ReflectionTestUtils.setField(reprocessorVerticle, "fetchSize", 2); - ReflectionTestUtils.setField(reprocessorVerticle, "elapseTime", 21600); - ReflectionTestUtils.setField(reprocessorVerticle, "reprocessCount", 3); - ReflectionTestUtils.setField(reprocessorVerticle, "reprocessExcludeStageNames", new ArrayList<>()); + ReflectionTestUtils.setField(reprocessorVerticle, "enabledProcessBasedCache", false); + List list = new ArrayList<>(); + ProcessAllocation processAllocation1 = new ProcessAllocation(); + processAllocation1.setPercentageAllocation(40); + List processList = new ArrayList<>(); + processList.add("NEW"); + processAllocation1.setProcesses(processList); + list.add(processAllocation1); + ReflectionTestUtils.setField(reprocessorVerticle, "processBasedFetchCountMap", list); + ReflectionTestUtils.setField(reprocessorVerticle, "fetchSize", 4); + ReflectionTestUtils.setField(reprocessorVerticle, "elapseTime", 21600); + ReflectionTestUtils.setField(reprocessorVerticle, "reprocessCount", 3); + ReflectionTestUtils.setField(reprocessorVerticle, "processBasedPrefetchLimit", 500); + ReflectionTestUtils.setField(reprocessorVerticle, "processBasedThreashold", 50); + ReflectionTestUtils.setField(reprocessorVerticle, "reprocessExcludeStageNames", new ArrayList<>()); List reprocessRestartTriggerFilterList = new ArrayList<>(); reprocessRestartTriggerFilterList.add("DemodedupStage:Success"); reprocessRestartTriggerFilterList.add("BioDedupeStage:*"); @@ -175,7 +187,34 @@ public void testProcessValid() throws TablenotAccessibleException, PacketManager reprocessorVerticle.process(dto); } - + + @Test + public void testProcessValidNew() throws TablenotAccessibleException, PacketManagerException, + ApisResourceAccessException, WorkflowActionException { + ReflectionTestUtils.setField(reprocessorVerticle, "enabledProcessBasedCache", true); + List dtolist = new ArrayList<>(); + InternalRegistrationStatusDto registrationStatusDto = new InternalRegistrationStatusDto(); + + registrationStatusDto.setRegistrationId("2018701130000410092018110735"); + registrationStatusDto.setRegistrationType(RegistrationType.NEW.toString()); + registrationStatusDto.setRegistrationStageName("PacketValidatorStage"); + registrationStatusDto.setDefaultResumeAction("RESUME_PROCESSING"); + registrationStatusDto.setResumeTimeStamp(LocalDateTime.now()); + registrationStatusDto.setReProcessRetryCount(0); + registrationStatusDto.setLatestTransactionStatusCode(RegistrationTransactionStatusCode.REPROCESS.toString()); + dtolist.add(registrationStatusDto); + InternalRegistrationStatusDto registrationStatusDto1 = new InternalRegistrationStatusDto(); + + registrationStatusDto1.setRegistrationId("2018701130000410092018110734"); + registrationStatusDto1.setRegistrationStageName("PacketValidatorStage"); + registrationStatusDto1.setReProcessRetryCount(1); + registrationStatusDto1.setRegistrationType("NEW"); + registrationStatusDto1.setLatestTransactionStatusCode(RegistrationTransactionStatusCode.SUCCESS.toString()); + dtolist.add(registrationStatusDto1); + reprocessorVerticle.process(dto); + + } + @Test public void testProcessFailure() throws TablenotAccessibleException, PacketManagerException, ApisResourceAccessException, WorkflowActionException { @@ -205,6 +244,34 @@ public void testProcessFailure() throws TablenotAccessibleException, PacketManag } + @Test + public void testProcessFailureNew() throws TablenotAccessibleException, PacketManagerException, + ApisResourceAccessException, WorkflowActionException { + ReflectionTestUtils.setField(reprocessorVerticle, "enabledProcessBasedCache", true); + + List dtolist = new ArrayList<>(); + InternalRegistrationStatusDto registrationStatusDto = new InternalRegistrationStatusDto(); + + registrationStatusDto.setRegistrationId("2018701130000410092018110735"); + registrationStatusDto.setRegistrationStageName("PacketValidatorStage"); + + registrationStatusDto.setDefaultResumeAction("RESUME_PROCESSING"); + registrationStatusDto.setResumeTimeStamp(LocalDateTime.now()); + registrationStatusDto.setRegistrationType("NEW"); + registrationStatusDto.setLatestTransactionStatusCode(RegistrationTransactionStatusCode.REPROCESS.toString()); + dtolist.add(registrationStatusDto); + InternalRegistrationStatusDto registrationStatusDto1 = new InternalRegistrationStatusDto(); + + registrationStatusDto1.setRegistrationId("2018701130000410092018110734"); + registrationStatusDto1.setRegistrationStageName("PacketValidatorStage"); + registrationStatusDto1.setReProcessRetryCount(3); + registrationStatusDto1.setRegistrationType("NEW"); + registrationStatusDto1.setLatestTransactionStatusCode(RegistrationTransactionStatusCode.SUCCESS.toString()); + dtolist.add(registrationStatusDto1); + reprocessorVerticle.process(dto); + + } + /** * Exception test. * @@ -219,7 +286,15 @@ public void exceptionTest() throws Exception { assertEquals(null, dto.getIsValid()); } - + + @Test + public void exceptionTestNew() throws Exception { + ReflectionTestUtils.setField(reprocessorVerticle, "enabledProcessBasedCache", true); + dto = reprocessorVerticle.process(dto); + assertEquals(null, dto.getIsValid()); + + } + @Test public void nullPointerExceptionTest() throws Exception { Mockito.when(registrationStatusService.getResumablePackets(anyInt())) @@ -233,7 +308,14 @@ public void TablenotAccessibleExceptionTest() throws Exception { Mockito.when(registrationStatusService.getUnProcessedPackets(anyInt(), anyLong(), anyInt(), anyList(), anyList())) .thenThrow(new TablenotAccessibleException("") { }); + dto = reprocessorVerticle.process(dto); + assertEquals(true, dto.getInternalError()); + } + + @Test + public void TablenotAccessibleExceptionTestNew() throws Exception { + ReflectionTestUtils.setField(reprocessorVerticle, "enabledProcessBasedCache", true); dto = reprocessorVerticle.process(dto); assertEquals(true, dto.getInternalError()); @@ -271,6 +353,36 @@ public void testProcessValidWithResumablePackets() throws TablenotAccessibleExce } + @Test + public void testProcessValidWithResumablePacketsNew() throws TablenotAccessibleException, PacketManagerException, + ApisResourceAccessException, WorkflowActionException { + ReflectionTestUtils.setField(reprocessorVerticle, "enabledProcessBasedCache", true); + List dtolist = new ArrayList<>(); + InternalRegistrationStatusDto registrationStatusDto = new InternalRegistrationStatusDto(); + + registrationStatusDto.setRegistrationId("2018701130000410092018110735"); + registrationStatusDto.setRegistrationType(RegistrationType.NEW.toString()); + registrationStatusDto.setRegistrationStageName("PacketValidatorStage"); + registrationStatusDto.setDefaultResumeAction("RESUME_PROCESSING"); + registrationStatusDto.setResumeTimeStamp(LocalDateTime.now()); + registrationStatusDto.setReProcessRetryCount(0); + registrationStatusDto.setLatestTransactionStatusCode(RegistrationTransactionStatusCode.REPROCESS.toString()); + dtolist.add(registrationStatusDto); + List reprocessorDtoList = new ArrayList<>(); + InternalRegistrationStatusDto registrationStatusDto1 = new InternalRegistrationStatusDto(); + + registrationStatusDto1.setRegistrationId("2018701130000410092018110734"); + registrationStatusDto1.setRegistrationStageName("PacketValidatorStage"); + registrationStatusDto1.setReProcessRetryCount(1); + registrationStatusDto1.setRegistrationType("NEW"); + registrationStatusDto1.setLatestTransactionStatusCode(RegistrationTransactionStatusCode.SUCCESS.toString()); + reprocessorDtoList.add(registrationStatusDto1); + Mockito.when(registrationStatusService.getResumablePackets(anyInt())) + .thenReturn(dtolist); + reprocessorVerticle.process(dto); + + } + @Test public void testProcessWithRestartFromStage() throws TablenotAccessibleException, PacketManagerException, @@ -293,4 +405,23 @@ public void testProcessWithRestartFromStage() throws TablenotAccessibleException } + @Test + public void testProcessWithRestartFromStageNew() throws TablenotAccessibleException, + PacketManagerException, + ApisResourceAccessException, WorkflowActionException { + ReflectionTestUtils.setField(reprocessorVerticle, "enabledProcessBasedCache", true); + + List dtolist = new ArrayList<>(); + InternalRegistrationStatusDto registrationStatusDto = new InternalRegistrationStatusDto(); + + registrationStatusDto.setRegistrationId("2018701130000410092018110735"); + registrationStatusDto.setRegistrationType(RegistrationType.NEW.toString()); + registrationStatusDto.setRegistrationStageName("BioDedupeStage"); + registrationStatusDto.setReProcessRetryCount(0); + registrationStatusDto.setStatusCode(RegistrationStatusCode.PROCESSING.toString()); + registrationStatusDto.setLatestTransactionStatusCode(RegistrationTransactionStatusCode.REPROCESS.toString()); + dtolist.add(registrationStatusDto); + reprocessorVerticle.process(dto); + + } }