Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import java.util.Map;
import java.util.Set;


import io.vertx.core.*;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
Expand All @@ -26,6 +28,7 @@
import io.mosip.registration.processor.core.code.ModuleName;
import io.mosip.registration.processor.core.code.RegistrationTransactionStatusCode;
import io.mosip.registration.processor.core.code.RegistrationTransactionTypeCode;
import io.mosip.registration.processor.core.constant.AuditLogConstant;
import io.mosip.registration.processor.core.constant.LoggerFileConstant;
import io.mosip.registration.processor.core.exception.util.PlatformErrorMessages;
import io.mosip.registration.processor.core.exception.util.PlatformSuccessMessages;
Expand All @@ -40,8 +43,6 @@
import io.mosip.registration.processor.status.dto.RegistrationStatusDto;
import io.mosip.registration.processor.status.exception.TablenotAccessibleException;
import io.mosip.registration.processor.status.service.RegistrationStatusService;
import io.vertx.core.AsyncResult;
import io.vertx.core.Vertx;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.json.JsonObject;

Expand Down Expand Up @@ -96,8 +97,8 @@ public class ReprocessorVerticle extends MosipVerticleAPIManager {
@Value("#{'${registration.processor.reprocess.restart-trigger-filter}'.split(',')}")
private List<String> reprocessRestartTriggerFilter;

/** The is transaction successful. */
boolean isTransactionSuccessful;
/** The is batch successful. */
boolean isBatchSuccessful;

/** The registration status service. */
@Autowired
Expand Down Expand Up @@ -209,12 +210,15 @@ public void start() {
this.createServer(router.getRouter(), Integer.parseInt(port));
}

/*
* (non-Javadoc)
*
* @see
* io.mosip.registration.processor.core.spi.eventbus.EventBusManager#process(
* java.lang.Object)
/**
* Triggers a batch reprocessing run for resumable and eligible registrations.
*
* <p>Fetches up to the configured batch size of resumable or unprocessed registration packets,
* processes each entry asynchronously, and records a batch-level audit event; on unrecoverable
* errors the provided message is marked with an internal error flag.</p>
*
* @param object a MessageDTO used as the invocation context; may be updated (for example its internalError flag and description) to reflect processing outcome
* @return the same MessageDTO instance passed in, potentially modified to indicate internal error or status information
*/
@Override
public MessageDTO process(MessageDTO object) {
Expand All @@ -226,9 +230,8 @@ public MessageDTO process(MessageDTO object) {
statusList.add(RegistrationTransactionStatusCode.IN_PROGRESS.toString());
regProcLogger.debug(LoggerFileConstant.SESSIONID.toString(), LoggerFileConstant.REGISTRATIONID.toString(), "",
"ReprocessorVerticle::process()::entry");
StringBuffer ridSb=new StringBuffer();
try {
Map<String, Set<String>> reprocessRestartTriggerMap = intializeReprocessRestartTriggerMapping();
Map<String, Set<String>> reprocessRestartTriggerMap = initializeReprocessRestartTriggerMapping();
reprocessorDtoList = registrationStatusService.getResumablePackets(fetchSize);
if (!CollectionUtils.isEmpty(reprocessorDtoList)) {
if (reprocessorDtoList.size() < fetchSize) {
Expand All @@ -243,91 +246,33 @@ public MessageDTO process(MessageDTO object) {
reprocessCount, statusList, reprocessExcludeStageNames);
}


if (!CollectionUtils.isEmpty(reprocessorDtoList)) {
regProcLogger.info("Reprocess count - {}", reprocessorDtoList.size());
List<Future> futures = new ArrayList<>();
reprocessorDtoList.forEach(dto -> {
String registrationId = dto.getRegistrationId();
ridSb.append(registrationId);
ridSb.append(",");
MessageDTO messageDTO = new MessageDTO();
messageDTO.setRid(registrationId);
messageDTO.setReg_type(dto.getRegistrationType());
messageDTO.setSource(dto.getSource());
messageDTO.setIteration(dto.getIteration());
messageDTO.setWorkflowInstanceId(dto.getWorkflowInstanceId());
if (reprocessCount.equals(dto.getReProcessRetryCount())) {
dto.setLatestTransactionStatusCode(
RegistrationTransactionStatusCode.REPROCESS_FAILED.toString());
dto.setLatestTransactionTypeCode(
RegistrationTransactionTypeCode.PACKET_REPROCESS.toString());
dto.setStatusComment(StatusUtil.RE_PROCESS_FAILED.getMessage());
dto.setStatusCode(RegistrationStatusCode.REPROCESS_FAILED.toString());
dto.setSubStatusCode(StatusUtil.RE_PROCESS_FAILED.getCode());
messageDTO.setIsValid(false);
description.setMessage(PlatformSuccessMessages.RPR_RE_PROCESS_FAILED.getMessage());
description.setCode(PlatformSuccessMessages.RPR_RE_PROCESS_FAILED.getCode());

} else {
messageDTO.setIsValid(true);
isTransactionSuccessful = true;
String stageName;
if (isRestartFromStageRequired(dto, reprocessRestartTriggerMap)) {
stageName = MessageBusUtil.getMessageBusAdress(reprocessRestartFromStage);
stageName = stageName.concat(ReprocessorConstants.BUS_IN);
sendAndSetStatus(dto, messageDTO, stageName);
dto.setStatusComment(StatusUtil.RE_PROCESS_RESTART_FROM_STAGE.getMessage());
dto.setSubStatusCode(StatusUtil.RE_PROCESS_RESTART_FROM_STAGE.getCode());
description
.setMessage(
PlatformSuccessMessages.RPR_SENT_TO_REPROCESS_RESTART_FROM_STAGE_SUCCESS
.getMessage());
description.setCode(
PlatformSuccessMessages.RPR_SENT_TO_REPROCESS_RESTART_FROM_STAGE_SUCCESS
.getCode());

} else {
stageName = MessageBusUtil.getMessageBusAdress(dto.getRegistrationStageName());
if (RegistrationTransactionStatusCode.SUCCESS.name()
.equalsIgnoreCase(dto.getLatestTransactionStatusCode())) {
stageName = stageName.concat(ReprocessorConstants.BUS_OUT);
} else {
stageName = stageName.concat(ReprocessorConstants.BUS_IN);
}
sendAndSetStatus(dto, messageDTO, stageName);
dto.setStatusComment(StatusUtil.RE_PROCESS_COMPLETED.getMessage());
dto.setSubStatusCode(StatusUtil.RE_PROCESS_COMPLETED.getCode());
description.setMessage(PlatformSuccessMessages.RPR_SENT_TO_REPROCESS_SUCCESS.getMessage());
description.setCode(PlatformSuccessMessages.RPR_SENT_TO_REPROCESS_SUCCESS.getCode());
}
}
regProcLogger.info(LoggerFileConstant.SESSIONID.toString(),
LoggerFileConstant.REGISTRATIONID.toString(), registrationId, description.getMessage());

/** Module-Id can be Both Success/Error code */
String moduleId = PlatformSuccessMessages.RPR_SENT_TO_REPROCESS_SUCCESS.getCode();
String moduleName = ModuleName.RE_PROCESSOR.toString();
registrationStatusService.updateRegistrationStatusForWorkflowEngine(dto, moduleId, moduleName);
String eventId = EventId.RPR_402.toString();
String eventName = EventName.UPDATE.toString();
String eventType = EventType.BUSINESS.toString();

if (!isTransactionSuccessful)
auditLogRequestBuilder.createAuditRequestBuilder(description.getMessage(), eventId, eventName,
eventType, moduleId, moduleName, registrationId);
Promise<Void> promise = Promise.promise();
vertx.executeBlocking(p -> {
processDTO(reprocessRestartTriggerMap, dto);
p.complete();
}, false, res -> {promise.complete();});
futures.add(promise.future());
});
CompositeFuture.all(futures).onComplete(ar -> {
regProcLogger.info("Successfully processed count - {}", futures.size());
isBatchSuccessful = true;
});

}
} catch (TablenotAccessibleException e) {
isTransactionSuccessful = false;
isBatchSuccessful = false;
object.setInternalError(Boolean.TRUE);
description.setMessage(PlatformErrorMessages.RPR_RGS_REGISTRATION_TABLE_NOT_ACCESSIBLE.getMessage());
description.setCode(PlatformErrorMessages.RPR_RGS_REGISTRATION_TABLE_NOT_ACCESSIBLE.getCode());
regProcLogger.error(LoggerFileConstant.SESSIONID.toString(),
description.getCode() + " -- ",
PlatformErrorMessages.RPR_RGS_REGISTRATION_TABLE_NOT_ACCESSIBLE.getMessage(), e.toString());

}catch (Exception ex) {
isTransactionSuccessful = false;
} catch (Exception ex) {
isBatchSuccessful = false;
description.setMessage(PlatformErrorMessages.REPROCESSOR_VERTICLE_FAILED.getMessage());
description.setCode(PlatformErrorMessages.REPROCESSOR_VERTICLE_FAILED.getCode());
regProcLogger.error(LoggerFileConstant.SESSIONID.toString(), LoggerFileConstant.REGISTRATIONID.toString(),
Expand All @@ -339,25 +284,113 @@ public MessageDTO process(MessageDTO object) {
} finally {
regProcLogger.info(LoggerFileConstant.SESSIONID.toString(), LoggerFileConstant.REGISTRATIONID.toString(),
null, description.getMessage());
if (isTransactionSuccessful)
if (isBatchSuccessful)
description.setMessage(PlatformSuccessMessages.RPR_RE_PROCESS_SUCCESS.getMessage());

String eventId = isTransactionSuccessful ? EventId.RPR_402.toString() : EventId.RPR_405.toString();
String eventName = isTransactionSuccessful ? EventName.UPDATE.toString() : EventName.EXCEPTION.toString();
String eventType = isTransactionSuccessful ? EventType.BUSINESS.toString() : EventType.SYSTEM.toString();
String eventId = isBatchSuccessful ? EventId.RPR_402.toString() : EventId.RPR_405.toString();
String eventName = isBatchSuccessful ? EventName.UPDATE.toString() : EventName.EXCEPTION.toString();
String eventType = isBatchSuccessful ? EventType.BUSINESS.toString() : EventType.SYSTEM.toString();

/** Module-Id can be Both Success/Error code */
String moduleId = isTransactionSuccessful ? PlatformSuccessMessages.RPR_RE_PROCESS_SUCCESS.getCode()
String moduleId = isBatchSuccessful ? PlatformSuccessMessages.RPR_RE_PROCESS_SUCCESS.getCode()
: description.getCode();
String moduleName = ModuleName.RE_PROCESSOR.toString();
auditLogRequestBuilder.createAuditRequestBuilder(description.getMessage(), eventId, eventName, eventType,
moduleId, moduleName, (ridSb.toString().length()>1?ridSb.substring(0,ridSb.length()-1):""));
moduleId, moduleName, AuditLogConstant.MULTIPLE_ID.name());
}

return object;
}

private Map<String, Set<String>> intializeReprocessRestartTriggerMapping() {
/**
* Processes a single registration DTO for reprocessing: decides restart vs continue, sends the appropriate message to the message bus, updates the DTO's status and workflow status, and records an audit entry when the per-DTO transaction fails.
*
* <p>Side effects: may modify the provided {@code dto} (status fields and retry count), send a message to a message-bus address, persist workflow status via {@code registrationStatusService}, and create an audit log entry if the transaction is not successful.</p>
*
* @param reprocessRestartTriggerMap mapping of stage names to the set of latest transaction status codes that should trigger restarting from the configured restart stage
* @param dto the internal registration status DTO to process and update
*/
private void processDTO(Map<String, Set<String>> reprocessRestartTriggerMap, InternalRegistrationStatusDto dto) {

boolean isTransactionSuccessful = false;
LogDescription description = new LogDescription();

String registrationId = dto.getRegistrationId();
MessageDTO messageDTO = new MessageDTO();
messageDTO.setRid(registrationId);
messageDTO.setReg_type(dto.getRegistrationType());
messageDTO.setSource(dto.getSource());
messageDTO.setIteration(dto.getIteration());
messageDTO.setWorkflowInstanceId(dto.getWorkflowInstanceId());
if (reprocessCount.equals(dto.getReProcessRetryCount())) {
dto.setLatestTransactionStatusCode(
RegistrationTransactionStatusCode.REPROCESS_FAILED.toString());
dto.setLatestTransactionTypeCode(
RegistrationTransactionTypeCode.PACKET_REPROCESS.toString());
dto.setStatusComment(StatusUtil.RE_PROCESS_FAILED.getMessage());
dto.setStatusCode(RegistrationStatusCode.REPROCESS_FAILED.toString());
dto.setSubStatusCode(StatusUtil.RE_PROCESS_FAILED.getCode());
messageDTO.setIsValid(false);
description.setMessage(PlatformSuccessMessages.RPR_RE_PROCESS_FAILED.getMessage());
description.setCode(PlatformSuccessMessages.RPR_RE_PROCESS_FAILED.getCode());

} else {
messageDTO.setIsValid(true);
isTransactionSuccessful = true;
String stageName;
if (isRestartFromStageRequired(dto, reprocessRestartTriggerMap)) {
stageName = MessageBusUtil.getMessageBusAdress(reprocessRestartFromStage);
stageName = stageName.concat(ReprocessorConstants.BUS_IN);
sendAndSetStatus(dto, messageDTO, stageName);
dto.setStatusComment(StatusUtil.RE_PROCESS_RESTART_FROM_STAGE.getMessage());
dto.setSubStatusCode(StatusUtil.RE_PROCESS_RESTART_FROM_STAGE.getCode());
description
.setMessage(
PlatformSuccessMessages.RPR_SENT_TO_REPROCESS_RESTART_FROM_STAGE_SUCCESS
.getMessage());
description.setCode(
PlatformSuccessMessages.RPR_SENT_TO_REPROCESS_RESTART_FROM_STAGE_SUCCESS
.getCode());

} else {
stageName = MessageBusUtil.getMessageBusAdress(dto.getRegistrationStageName());
if (RegistrationTransactionStatusCode.SUCCESS.name()
.equalsIgnoreCase(dto.getLatestTransactionStatusCode())) {
stageName = stageName.concat(ReprocessorConstants.BUS_OUT);
} else {
stageName = stageName.concat(ReprocessorConstants.BUS_IN);
}
sendAndSetStatus(dto, messageDTO, stageName);
dto.setStatusComment(StatusUtil.RE_PROCESS_COMPLETED.getMessage());
dto.setSubStatusCode(StatusUtil.RE_PROCESS_COMPLETED.getCode());
description.setMessage(PlatformSuccessMessages.RPR_SENT_TO_REPROCESS_SUCCESS.getMessage());
description.setCode(PlatformSuccessMessages.RPR_SENT_TO_REPROCESS_SUCCESS.getCode());
}
}
regProcLogger.info(LoggerFileConstant.SESSIONID.toString(),
LoggerFileConstant.REGISTRATIONID.toString(), registrationId, description.getMessage());

/** Module-Id can be Both Success/Error code */
String moduleId = PlatformSuccessMessages.RPR_SENT_TO_REPROCESS_SUCCESS.getCode();
String moduleName = ModuleName.RE_PROCESSOR.toString();
registrationStatusService.updateRegistrationStatusForWorkflowEngine(dto, moduleId, moduleName);
String eventId = EventId.RPR_402.toString();
String eventName = EventName.UPDATE.toString();
String eventType = EventType.BUSINESS.toString();

if (!isTransactionSuccessful) {
auditLogRequestBuilder.createAuditRequestBuilder(description.getMessage(), eventId, eventName,
eventType, moduleId, moduleName, registrationId);
}
}

/**
* Builds a mapping from stage name to the set of latest transaction status codes that should trigger a restart from that stage.
*
* The map's keys are stage names and each value is a set of uppercase status codes (or the full set: SUCCESS, IN_PROGRESS, REPROCESS when `"*"` is specified).
*
* @return a map where each key is a stage name and the corresponding value is the set of latest transaction status codes that trigger a restart for that stage
*/
private Map<String, Set<String>> initializeReprocessRestartTriggerMapping() {
Map<String, Set<String>> reprocessRestartTriggerMap = new HashMap<String, Set<String>>();
for (String filter : reprocessRestartTriggerFilter) {
String[] stageAndStatus = filter.split(":");
Expand Down Expand Up @@ -422,4 +455,4 @@ private void sendAndSetStatus(InternalRegistrationStatusDto dto, MessageDTO mess
protected String getPropertyPrefix() {
return VERTICLE_PROPERTY_PREFIX;
}
}
}