diff --git a/registration-processor/workflow-engine/registration-processor-reprocessor/src/main/java/io/mosip/registration/processor/reprocessor/verticle/ReprocessorVerticle.java b/registration-processor/workflow-engine/registration-processor-reprocessor/src/main/java/io/mosip/registration/processor/reprocessor/verticle/ReprocessorVerticle.java index bebd730a69a..598a96f2355 100644 --- a/registration-processor/workflow-engine/registration-processor-reprocessor/src/main/java/io/mosip/registration/processor/reprocessor/verticle/ReprocessorVerticle.java +++ b/registration-processor/workflow-engine/registration-processor-reprocessor/src/main/java/io/mosip/registration/processor/reprocessor/verticle/ReprocessorVerticle.java @@ -7,6 +7,8 @@ import java.util.Map; import java.util.Set; + +import io.vertx.core.*; import org.apache.commons.lang3.exception.ExceptionUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; @@ -26,6 +28,7 @@ import io.mosip.registration.processor.core.code.ModuleName; import io.mosip.registration.processor.core.code.RegistrationTransactionStatusCode; import io.mosip.registration.processor.core.code.RegistrationTransactionTypeCode; +import io.mosip.registration.processor.core.constant.AuditLogConstant; import io.mosip.registration.processor.core.constant.LoggerFileConstant; import io.mosip.registration.processor.core.exception.util.PlatformErrorMessages; import io.mosip.registration.processor.core.exception.util.PlatformSuccessMessages; @@ -40,8 +43,6 @@ import io.mosip.registration.processor.status.dto.RegistrationStatusDto; import io.mosip.registration.processor.status.exception.TablenotAccessibleException; import io.mosip.registration.processor.status.service.RegistrationStatusService; -import io.vertx.core.AsyncResult; -import io.vertx.core.Vertx; import io.vertx.core.eventbus.EventBus; import io.vertx.core.json.JsonObject; @@ -96,8 +97,8 @@ public class ReprocessorVerticle extends MosipVerticleAPIManager { @Value("#{'${registration.processor.reprocess.restart-trigger-filter}'.split(',')}") private List reprocessRestartTriggerFilter; - /** The is transaction successful. */ - boolean isTransactionSuccessful; + /** The is batch successful. */ + boolean isBatchSuccessful; /** The registration status service. */ @Autowired @@ -209,12 +210,15 @@ public void start() { this.createServer(router.getRouter(), Integer.parseInt(port)); } - /* - * (non-Javadoc) - * - * @see - * io.mosip.registration.processor.core.spi.eventbus.EventBusManager#process( - * java.lang.Object) + /** + * Triggers a batch reprocessing run for resumable and eligible registrations. + * + *

Fetches up to the configured batch size of resumable or unprocessed registration packets, + * processes each entry asynchronously, and records a batch-level audit event; on unrecoverable + * errors the provided message is marked with an internal error flag.

+ * + * @param object a MessageDTO used as the invocation context; may be updated (for example its internalError flag and description) to reflect processing outcome + * @return the same MessageDTO instance passed in, potentially modified to indicate internal error or status information */ @Override public MessageDTO process(MessageDTO object) { @@ -226,9 +230,8 @@ public MessageDTO process(MessageDTO object) { statusList.add(RegistrationTransactionStatusCode.IN_PROGRESS.toString()); regProcLogger.debug(LoggerFileConstant.SESSIONID.toString(), LoggerFileConstant.REGISTRATIONID.toString(), "", "ReprocessorVerticle::process()::entry"); - StringBuffer ridSb=new StringBuffer(); try { - Map> reprocessRestartTriggerMap = intializeReprocessRestartTriggerMapping(); + Map> reprocessRestartTriggerMap = initializeReprocessRestartTriggerMapping(); reprocessorDtoList = registrationStatusService.getResumablePackets(fetchSize); if (!CollectionUtils.isEmpty(reprocessorDtoList)) { if (reprocessorDtoList.size() < fetchSize) { @@ -243,82 +246,24 @@ public MessageDTO process(MessageDTO object) { reprocessCount, statusList, reprocessExcludeStageNames); } - if (!CollectionUtils.isEmpty(reprocessorDtoList)) { + regProcLogger.info("Reprocess count - {}", reprocessorDtoList.size()); + List futures = new ArrayList<>(); reprocessorDtoList.forEach(dto -> { - String registrationId = dto.getRegistrationId(); - ridSb.append(registrationId); - ridSb.append(","); - MessageDTO messageDTO = new MessageDTO(); - messageDTO.setRid(registrationId); - messageDTO.setReg_type(dto.getRegistrationType()); - messageDTO.setSource(dto.getSource()); - messageDTO.setIteration(dto.getIteration()); - messageDTO.setWorkflowInstanceId(dto.getWorkflowInstanceId()); - if (reprocessCount.equals(dto.getReProcessRetryCount())) { - dto.setLatestTransactionStatusCode( - RegistrationTransactionStatusCode.REPROCESS_FAILED.toString()); - dto.setLatestTransactionTypeCode( - RegistrationTransactionTypeCode.PACKET_REPROCESS.toString()); - dto.setStatusComment(StatusUtil.RE_PROCESS_FAILED.getMessage()); - dto.setStatusCode(RegistrationStatusCode.REPROCESS_FAILED.toString()); - dto.setSubStatusCode(StatusUtil.RE_PROCESS_FAILED.getCode()); - messageDTO.setIsValid(false); - description.setMessage(PlatformSuccessMessages.RPR_RE_PROCESS_FAILED.getMessage()); - description.setCode(PlatformSuccessMessages.RPR_RE_PROCESS_FAILED.getCode()); - - } else { - messageDTO.setIsValid(true); - isTransactionSuccessful = true; - String stageName; - if (isRestartFromStageRequired(dto, reprocessRestartTriggerMap)) { - stageName = MessageBusUtil.getMessageBusAdress(reprocessRestartFromStage); - stageName = stageName.concat(ReprocessorConstants.BUS_IN); - sendAndSetStatus(dto, messageDTO, stageName); - dto.setStatusComment(StatusUtil.RE_PROCESS_RESTART_FROM_STAGE.getMessage()); - dto.setSubStatusCode(StatusUtil.RE_PROCESS_RESTART_FROM_STAGE.getCode()); - description - .setMessage( - PlatformSuccessMessages.RPR_SENT_TO_REPROCESS_RESTART_FROM_STAGE_SUCCESS - .getMessage()); - description.setCode( - PlatformSuccessMessages.RPR_SENT_TO_REPROCESS_RESTART_FROM_STAGE_SUCCESS - .getCode()); - - } else { - stageName = MessageBusUtil.getMessageBusAdress(dto.getRegistrationStageName()); - if (RegistrationTransactionStatusCode.SUCCESS.name() - .equalsIgnoreCase(dto.getLatestTransactionStatusCode())) { - stageName = stageName.concat(ReprocessorConstants.BUS_OUT); - } else { - stageName = stageName.concat(ReprocessorConstants.BUS_IN); - } - sendAndSetStatus(dto, messageDTO, stageName); - dto.setStatusComment(StatusUtil.RE_PROCESS_COMPLETED.getMessage()); - dto.setSubStatusCode(StatusUtil.RE_PROCESS_COMPLETED.getCode()); - description.setMessage(PlatformSuccessMessages.RPR_SENT_TO_REPROCESS_SUCCESS.getMessage()); - description.setCode(PlatformSuccessMessages.RPR_SENT_TO_REPROCESS_SUCCESS.getCode()); - } - } - regProcLogger.info(LoggerFileConstant.SESSIONID.toString(), - LoggerFileConstant.REGISTRATIONID.toString(), registrationId, description.getMessage()); - - /** Module-Id can be Both Success/Error code */ - String moduleId = PlatformSuccessMessages.RPR_SENT_TO_REPROCESS_SUCCESS.getCode(); - String moduleName = ModuleName.RE_PROCESSOR.toString(); - registrationStatusService.updateRegistrationStatusForWorkflowEngine(dto, moduleId, moduleName); - String eventId = EventId.RPR_402.toString(); - String eventName = EventName.UPDATE.toString(); - String eventType = EventType.BUSINESS.toString(); - - if (!isTransactionSuccessful) - auditLogRequestBuilder.createAuditRequestBuilder(description.getMessage(), eventId, eventName, - eventType, moduleId, moduleName, registrationId); + Promise promise = Promise.promise(); + vertx.executeBlocking(p -> { + processDTO(reprocessRestartTriggerMap, dto); + p.complete(); + }, false, res -> {promise.complete();}); + futures.add(promise.future()); + }); + CompositeFuture.all(futures).onComplete(ar -> { + regProcLogger.info("Successfully processed count - {}", futures.size()); + isBatchSuccessful = true; }); - } } catch (TablenotAccessibleException e) { - isTransactionSuccessful = false; + isBatchSuccessful = false; object.setInternalError(Boolean.TRUE); description.setMessage(PlatformErrorMessages.RPR_RGS_REGISTRATION_TABLE_NOT_ACCESSIBLE.getMessage()); description.setCode(PlatformErrorMessages.RPR_RGS_REGISTRATION_TABLE_NOT_ACCESSIBLE.getCode()); @@ -326,8 +271,8 @@ public MessageDTO process(MessageDTO object) { description.getCode() + " -- ", PlatformErrorMessages.RPR_RGS_REGISTRATION_TABLE_NOT_ACCESSIBLE.getMessage(), e.toString()); - }catch (Exception ex) { - isTransactionSuccessful = false; + } catch (Exception ex) { + isBatchSuccessful = false; description.setMessage(PlatformErrorMessages.REPROCESSOR_VERTICLE_FAILED.getMessage()); description.setCode(PlatformErrorMessages.REPROCESSOR_VERTICLE_FAILED.getCode()); regProcLogger.error(LoggerFileConstant.SESSIONID.toString(), LoggerFileConstant.REGISTRATIONID.toString(), @@ -339,25 +284,113 @@ public MessageDTO process(MessageDTO object) { } finally { regProcLogger.info(LoggerFileConstant.SESSIONID.toString(), LoggerFileConstant.REGISTRATIONID.toString(), null, description.getMessage()); - if (isTransactionSuccessful) + if (isBatchSuccessful) description.setMessage(PlatformSuccessMessages.RPR_RE_PROCESS_SUCCESS.getMessage()); - String eventId = isTransactionSuccessful ? EventId.RPR_402.toString() : EventId.RPR_405.toString(); - String eventName = isTransactionSuccessful ? EventName.UPDATE.toString() : EventName.EXCEPTION.toString(); - String eventType = isTransactionSuccessful ? EventType.BUSINESS.toString() : EventType.SYSTEM.toString(); + String eventId = isBatchSuccessful ? EventId.RPR_402.toString() : EventId.RPR_405.toString(); + String eventName = isBatchSuccessful ? EventName.UPDATE.toString() : EventName.EXCEPTION.toString(); + String eventType = isBatchSuccessful ? EventType.BUSINESS.toString() : EventType.SYSTEM.toString(); /** Module-Id can be Both Success/Error code */ - String moduleId = isTransactionSuccessful ? PlatformSuccessMessages.RPR_RE_PROCESS_SUCCESS.getCode() + String moduleId = isBatchSuccessful ? PlatformSuccessMessages.RPR_RE_PROCESS_SUCCESS.getCode() : description.getCode(); String moduleName = ModuleName.RE_PROCESSOR.toString(); auditLogRequestBuilder.createAuditRequestBuilder(description.getMessage(), eventId, eventName, eventType, - moduleId, moduleName, (ridSb.toString().length()>1?ridSb.substring(0,ridSb.length()-1):"")); + moduleId, moduleName, AuditLogConstant.MULTIPLE_ID.name()); } - return object; } - private Map> intializeReprocessRestartTriggerMapping() { + /** + * Processes a single registration DTO for reprocessing: decides restart vs continue, sends the appropriate message to the message bus, updates the DTO's status and workflow status, and records an audit entry when the per-DTO transaction fails. + * + *

Side effects: may modify the provided {@code dto} (status fields and retry count), send a message to a message-bus address, persist workflow status via {@code registrationStatusService}, and create an audit log entry if the transaction is not successful.

+ * + * @param reprocessRestartTriggerMap mapping of stage names to the set of latest transaction status codes that should trigger restarting from the configured restart stage + * @param dto the internal registration status DTO to process and update + */ + private void processDTO(Map> reprocessRestartTriggerMap, InternalRegistrationStatusDto dto) { + + boolean isTransactionSuccessful = false; + LogDescription description = new LogDescription(); + + String registrationId = dto.getRegistrationId(); + MessageDTO messageDTO = new MessageDTO(); + messageDTO.setRid(registrationId); + messageDTO.setReg_type(dto.getRegistrationType()); + messageDTO.setSource(dto.getSource()); + messageDTO.setIteration(dto.getIteration()); + messageDTO.setWorkflowInstanceId(dto.getWorkflowInstanceId()); + if (reprocessCount.equals(dto.getReProcessRetryCount())) { + dto.setLatestTransactionStatusCode( + RegistrationTransactionStatusCode.REPROCESS_FAILED.toString()); + dto.setLatestTransactionTypeCode( + RegistrationTransactionTypeCode.PACKET_REPROCESS.toString()); + dto.setStatusComment(StatusUtil.RE_PROCESS_FAILED.getMessage()); + dto.setStatusCode(RegistrationStatusCode.REPROCESS_FAILED.toString()); + dto.setSubStatusCode(StatusUtil.RE_PROCESS_FAILED.getCode()); + messageDTO.setIsValid(false); + description.setMessage(PlatformSuccessMessages.RPR_RE_PROCESS_FAILED.getMessage()); + description.setCode(PlatformSuccessMessages.RPR_RE_PROCESS_FAILED.getCode()); + + } else { + messageDTO.setIsValid(true); + isTransactionSuccessful = true; + String stageName; + if (isRestartFromStageRequired(dto, reprocessRestartTriggerMap)) { + stageName = MessageBusUtil.getMessageBusAdress(reprocessRestartFromStage); + stageName = stageName.concat(ReprocessorConstants.BUS_IN); + sendAndSetStatus(dto, messageDTO, stageName); + dto.setStatusComment(StatusUtil.RE_PROCESS_RESTART_FROM_STAGE.getMessage()); + dto.setSubStatusCode(StatusUtil.RE_PROCESS_RESTART_FROM_STAGE.getCode()); + description + .setMessage( + PlatformSuccessMessages.RPR_SENT_TO_REPROCESS_RESTART_FROM_STAGE_SUCCESS + .getMessage()); + description.setCode( + PlatformSuccessMessages.RPR_SENT_TO_REPROCESS_RESTART_FROM_STAGE_SUCCESS + .getCode()); + + } else { + stageName = MessageBusUtil.getMessageBusAdress(dto.getRegistrationStageName()); + if (RegistrationTransactionStatusCode.SUCCESS.name() + .equalsIgnoreCase(dto.getLatestTransactionStatusCode())) { + stageName = stageName.concat(ReprocessorConstants.BUS_OUT); + } else { + stageName = stageName.concat(ReprocessorConstants.BUS_IN); + } + sendAndSetStatus(dto, messageDTO, stageName); + dto.setStatusComment(StatusUtil.RE_PROCESS_COMPLETED.getMessage()); + dto.setSubStatusCode(StatusUtil.RE_PROCESS_COMPLETED.getCode()); + description.setMessage(PlatformSuccessMessages.RPR_SENT_TO_REPROCESS_SUCCESS.getMessage()); + description.setCode(PlatformSuccessMessages.RPR_SENT_TO_REPROCESS_SUCCESS.getCode()); + } + } + regProcLogger.info(LoggerFileConstant.SESSIONID.toString(), + LoggerFileConstant.REGISTRATIONID.toString(), registrationId, description.getMessage()); + + /** Module-Id can be Both Success/Error code */ + String moduleId = PlatformSuccessMessages.RPR_SENT_TO_REPROCESS_SUCCESS.getCode(); + String moduleName = ModuleName.RE_PROCESSOR.toString(); + registrationStatusService.updateRegistrationStatusForWorkflowEngine(dto, moduleId, moduleName); + String eventId = EventId.RPR_402.toString(); + String eventName = EventName.UPDATE.toString(); + String eventType = EventType.BUSINESS.toString(); + + if (!isTransactionSuccessful) { + auditLogRequestBuilder.createAuditRequestBuilder(description.getMessage(), eventId, eventName, + eventType, moduleId, moduleName, registrationId); + } + } + + /** + * Builds a mapping from stage name to the set of latest transaction status codes that should trigger a restart from that stage. + * + * The map's keys are stage names and each value is a set of uppercase status codes (or the full set: SUCCESS, IN_PROGRESS, REPROCESS when `"*"` is specified). + * + * @return a map where each key is a stage name and the corresponding value is the set of latest transaction status codes that trigger a restart for that stage + */ + private Map> initializeReprocessRestartTriggerMapping() { Map> reprocessRestartTriggerMap = new HashMap>(); for (String filter : reprocessRestartTriggerFilter) { String[] stageAndStatus = filter.split(":"); @@ -422,4 +455,4 @@ private void sendAndSetStatus(InternalRegistrationStatusDto dto, MessageDTO mess protected String getPropertyPrefix() { return VERTICLE_PROPERTY_PREFIX; } -} +} \ No newline at end of file