-
Notifications
You must be signed in to change notification settings - Fork 204
Parallel packet process using Vertx #2234
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Conversation
WalkthroughThe ReprocessorVerticle was refactored to process reprocessor items in parallel using Vert.x executeBlocking with Promises and CompositeFuture; per-item logic moved into a new private processDTO(...); Utilities were consolidated into a new Utility class and callers/tests updated accordingly; several legacy helper methods were removed from Utilities and reimplemented in Utility. Changes
Sequence Diagram(s)sequenceDiagram
participant Reprocessor as ReprocessorVerticle
participant Worker as executeBlocking (thread)
participant DB as Database/StatusRepo
participant Audit as AuditService
Reprocessor->>Worker: for each DTO -> executeBlocking(processDTO)
Worker->>DB: read/update per-item status (blocking)
Worker-->>Reprocessor: Promise success/failure
Reprocessor->>Reprocessor: CompositeFuture.all() aggregate results
alt batch success
Reprocessor->>Audit: emit success event (RPR_402) with moduleId
else batch failure
Reprocessor->>Audit: emit failure event (RPR_405) marked internal error
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes
Possibly related PRs
Suggested reviewers
Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Warning There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure. 🔧 ast-grep (0.40.0)registration-processor/core-processor/registration-processor-uin-generator-stage/src/test/java/io/mosip/registration/processor/stages/uigenerator/UinGeneratorStageTest.javaThanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Nitpick comments (2)
registration-processor/workflow-engine/registration-processor-reprocessor/src/main/java/io/mosip/registration/processor/reprocessor/verticle/ReprocessorVerticle.java (2)
11-11: Consider using explicit imports instead of wildcard import.Wildcard imports can obscure dependencies and introduce naming conflicts. Consider importing only the specific classes needed (e.g.,
Promise,Future,CompositeFuture,Vertx,AsyncResult).
361-372: Hardcoded successmoduleIdis used even for failed transactions.When
isTransactionSuccessfulisfalse(reprocess count exceeded at lines 313-323), themoduleIdon line 362 is still set toRPR_SENT_TO_REPROCESS_SUCCESS. This seems inconsistent—consider usingdescription.getCode()which would contain the failure code (RPR_RE_PROCESS_FAILED)./** Module-Id can be Both Success/Error code */ - String moduleId = PlatformSuccessMessages.RPR_SENT_TO_REPROCESS_SUCCESS.getCode(); + String moduleId = isTransactionSuccessful + ? PlatformSuccessMessages.RPR_SENT_TO_REPROCESS_SUCCESS.getCode() + : description.getCode(); String moduleName = ModuleName.RE_PROCESSOR.toString();
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
registration-processor/workflow-engine/registration-processor-reprocessor/src/main/java/io/mosip/registration/processor/reprocessor/verticle/ReprocessorVerticle.java(6 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
registration-processor/workflow-engine/registration-processor-reprocessor/src/main/java/io/mosip/registration/processor/reprocessor/verticle/ReprocessorVerticle.java (1)
registration-processor/workflow-engine/registration-processor-reprocessor/src/main/java/io/mosip/registration/processor/reprocessor/constants/ReprocessorConstants.java (1)
ReprocessorConstants(3-37)
| /** The is batch successful. */ | ||
| boolean isBatchSuccessful; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instance field isBatchSuccessful is not thread-safe and scoped incorrectly.
This field is shared across all invocations of process(). If the scheduler triggers concurrent batch executions, different batches will interfere with each other's state. This should be a local variable within the process method, not an instance field.
- /** The is batch successful. */
- boolean isBatchSuccessful;Move this to a local variable in the process method (though see additional comments about the async race condition).
Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In
registration-processor/workflow-engine/registration-processor-reprocessor/src/main/java/io/mosip/registration/processor/reprocessor/verticle/ReprocessorVerticle.java
around lines 100-101, the boolean field isBatchSuccessful is declared as an
instance field but must be moved to a method-local variable inside the process()
method; remove the instance field, declare a local boolean isBatchSuccessful at
the start of process(), update all references to use the local variable, and if
that state must be mutated from inside lambdas/async handlers make it a
method-local AtomicBoolean (or similarly thread-safe holder) so the code
compiles and no shared instance state exists across concurrent process()
invocations.
| regProcLogger.info("Reprocess count - {}", reprocessorDtoList.size()); | ||
| List<Future> futures = new ArrayList<>(); | ||
| reprocessorDtoList.forEach(dto -> { | ||
| String registrationId = dto.getRegistrationId(); | ||
| ridSb.append(registrationId); | ||
| ridSb.append(","); | ||
| MessageDTO messageDTO = new MessageDTO(); | ||
| messageDTO.setRid(registrationId); | ||
| messageDTO.setReg_type(dto.getRegistrationType()); | ||
| messageDTO.setSource(dto.getSource()); | ||
| messageDTO.setIteration(dto.getIteration()); | ||
| messageDTO.setWorkflowInstanceId(dto.getWorkflowInstanceId()); | ||
| if (reprocessCount.equals(dto.getReProcessRetryCount())) { | ||
| dto.setLatestTransactionStatusCode( | ||
| RegistrationTransactionStatusCode.REPROCESS_FAILED.toString()); | ||
| dto.setLatestTransactionTypeCode( | ||
| RegistrationTransactionTypeCode.PACKET_REPROCESS.toString()); | ||
| dto.setStatusComment(StatusUtil.RE_PROCESS_FAILED.getMessage()); | ||
| dto.setStatusCode(RegistrationStatusCode.REPROCESS_FAILED.toString()); | ||
| dto.setSubStatusCode(StatusUtil.RE_PROCESS_FAILED.getCode()); | ||
| messageDTO.setIsValid(false); | ||
| description.setMessage(PlatformSuccessMessages.RPR_RE_PROCESS_FAILED.getMessage()); | ||
| description.setCode(PlatformSuccessMessages.RPR_RE_PROCESS_FAILED.getCode()); | ||
|
|
||
| } else { | ||
| messageDTO.setIsValid(true); | ||
| isTransactionSuccessful = true; | ||
| String stageName; | ||
| if (isRestartFromStageRequired(dto, reprocessRestartTriggerMap)) { | ||
| stageName = MessageBusUtil.getMessageBusAdress(reprocessRestartFromStage); | ||
| stageName = stageName.concat(ReprocessorConstants.BUS_IN); | ||
| sendAndSetStatus(dto, messageDTO, stageName); | ||
| dto.setStatusComment(StatusUtil.RE_PROCESS_RESTART_FROM_STAGE.getMessage()); | ||
| dto.setSubStatusCode(StatusUtil.RE_PROCESS_RESTART_FROM_STAGE.getCode()); | ||
| description | ||
| .setMessage( | ||
| PlatformSuccessMessages.RPR_SENT_TO_REPROCESS_RESTART_FROM_STAGE_SUCCESS | ||
| .getMessage()); | ||
| description.setCode( | ||
| PlatformSuccessMessages.RPR_SENT_TO_REPROCESS_RESTART_FROM_STAGE_SUCCESS | ||
| .getCode()); | ||
|
|
||
| } else { | ||
| stageName = MessageBusUtil.getMessageBusAdress(dto.getRegistrationStageName()); | ||
| if (RegistrationTransactionStatusCode.SUCCESS.name() | ||
| .equalsIgnoreCase(dto.getLatestTransactionStatusCode())) { | ||
| stageName = stageName.concat(ReprocessorConstants.BUS_OUT); | ||
| } else { | ||
| stageName = stageName.concat(ReprocessorConstants.BUS_IN); | ||
| } | ||
| sendAndSetStatus(dto, messageDTO, stageName); | ||
| dto.setStatusComment(StatusUtil.RE_PROCESS_COMPLETED.getMessage()); | ||
| dto.setSubStatusCode(StatusUtil.RE_PROCESS_COMPLETED.getCode()); | ||
| description.setMessage(PlatformSuccessMessages.RPR_SENT_TO_REPROCESS_SUCCESS.getMessage()); | ||
| description.setCode(PlatformSuccessMessages.RPR_SENT_TO_REPROCESS_SUCCESS.getCode()); | ||
| } | ||
| } | ||
| regProcLogger.info(LoggerFileConstant.SESSIONID.toString(), | ||
| LoggerFileConstant.REGISTRATIONID.toString(), registrationId, description.getMessage()); | ||
|
|
||
| /** Module-Id can be Both Success/Error code */ | ||
| String moduleId = PlatformSuccessMessages.RPR_SENT_TO_REPROCESS_SUCCESS.getCode(); | ||
| String moduleName = ModuleName.RE_PROCESSOR.toString(); | ||
| registrationStatusService.updateRegistrationStatusForWorkflowEngine(dto, moduleId, moduleName); | ||
| String eventId = EventId.RPR_402.toString(); | ||
| String eventName = EventName.UPDATE.toString(); | ||
| String eventType = EventType.BUSINESS.toString(); | ||
|
|
||
| if (!isTransactionSuccessful) | ||
| auditLogRequestBuilder.createAuditRequestBuilder(description.getMessage(), eventId, eventName, | ||
| eventType, moduleId, moduleName, registrationId); | ||
| Promise<Void> promise = Promise.promise(); | ||
| vertx.executeBlocking(p -> { | ||
| processDTO(reprocessRestartTriggerMap, dto); | ||
| p.complete(); | ||
| }, false, res -> {promise.complete();}); | ||
| futures.add(promise.future()); | ||
| }); | ||
| CompositeFuture.all(futures).onComplete(ar -> { | ||
| regProcLogger.info("Successfully processed count - {}", futures.size()); | ||
| isBatchSuccessful = true; | ||
| }); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Critical race condition: finally block executes before async processing completes.
CompositeFuture.all(...).onComplete(...) is non-blocking. The finally block (lines 281-297) will execute immediately after this code, before any of the async work completes. This means:
isBatchSuccessfulwill always befalse(or stale from a previous run) when the audit log is written- The audit log will always report failure even for successful batches
- Exceptions in
processDTOare silently swallowed sincepromise.complete()is called unconditionally
To fix, you need to either:
- Option A: Make the processing synchronous using
executeBlockingwithawait/blocking semantics - Option B: Move the audit logging inside the
onCompletecallback - Option C: Use
CompositeFuture.all(futures).toCompletionStage().toCompletableFuture().join()to block until completion
Also fix the raw type and exception handling:
- regProcLogger.info("Reprocess count - {}", reprocessorDtoList.size());
- List<Future> futures = new ArrayList<>();
+ regProcLogger.info("Reprocess count - {}", reprocessorDtoList.size());
+ List<Future<Void>> futures = new ArrayList<>();
+ boolean[] batchSuccess = {true}; // Use array for mutation in lambda
reprocessorDtoList.forEach(dto -> {
Promise<Void> promise = Promise.promise();
vertx.executeBlocking(p -> {
- processDTO(reprocessRestartTriggerMap, dto);
- p.complete();
- }, false, res -> {promise.complete();});
+ try {
+ processDTO(reprocessRestartTriggerMap, dto);
+ p.complete();
+ } catch (Exception e) {
+ p.fail(e);
+ }
+ }, false, res -> {
+ if (res.failed()) {
+ batchSuccess[0] = false;
+ regProcLogger.error("Failed to process DTO", res.cause());
+ }
+ promise.complete();
+ });
futures.add(promise.future());
});Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In registration-processor/.../ReprocessorVerticle.java around lines 247-260 (and
note the finally at 281-297), the current code starts async work with
CompositeFuture.all(...).onComplete(...) but then exits the method so the
finally block runs before async completion, and promises always complete
unconditionally hiding exceptions; fix by (1) using typed futures: List<Future>
-> List<Future> where Future is parameterized (e.g., List<Future> futures = new
ArrayList<>(); with Future<Void>), (2) inside vertx.executeBlocking wrap
processDTO in try/catch and call p.fail(e)/promise.fail(e) on exception and
promise.complete() only on success, and (3) move the audit logging and setting
of isBatchSuccessful into the CompositeFuture.all(...).onComplete callback (set
isBatchSuccessful = ar.succeeded(); log success/failure there and perform
audit/cleanup), so the finally block no longer assumes async work completed.
|
@coderabbitai generate docstrings |
✅ Actions performedInitiated docstring generation; will generate only if new commits exist. |
|
Note Docstrings generation - SUCCESS |
Docstrings generation was requested by @MuralitharanK. * #2234 (comment) The following files were modified: * `registration-processor/workflow-engine/registration-processor-reprocessor/src/main/java/io/mosip/registration/processor/reprocessor/verticle/ReprocessorVerticle.java`
…e biometric update bug fix code. (mosip#2221) * [MOSIP-43633] Code changes for removing the cyclic dependency from the biometric update bug fix code. Signed-off-by: Ashok Kumar Sharma <ashok@mosip.io> * [MOSIP-43633] - UtilitiesTest to UtilityTest movement Signed-off-by: Nidhi0201 <nidhi.k@cyberpwn.com> --------- Signed-off-by: Ashok Kumar Sharma <ashok@mosip.io> Signed-off-by: Nidhi0201 <nidhi.k@cyberpwn.com> Co-authored-by: Nidhi0201 <nidhi.k@cyberpwn.com> Signed-off-by: Muralitharan K <muralitharan@mosip.io>
Signed-off-by: Muralitharan K <muralitharan@mosip.io>
Signed-off-by: Muralitharan K <muralitharan@mosip.io>
e8a80ab to
353c3a7
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (3)
registration-processor/workflow-engine/registration-processor-reprocessor/src/main/java/io/mosip/registration/processor/reprocessor/verticle/ReprocessorVerticle.java (3)
100-101: Instance fieldisBatchSuccessfulremains incorrectly scoped.This issue was previously flagged. The field is shared across concurrent batch executions and should be a local variable within the
processmethod to avoid interference between concurrent scheduler triggers.
252-270: Race condition persists:finallyblock executes before async processing completes.This issue was previously flagged.
CompositeFuture.all(...).onComplete(...)is non-blocking, so thefinallyblock (lines 291-307) executes immediately while async work is still in progress. This causes:
- Audit logging always uses the initial
isBatchSuccessful = truestate- Async failures logged at line 268 never update
isBatchSuccessfulThe fix requires moving audit/cleanup logic inside the
onCompletecallback or blocking until completion.CompositeFuture.all(futures).onComplete(ar -> { if (ar.succeeded()) { regProcLogger.info("Successfully processed count - {}", futures.size()); } else { + isBatchSuccessful = false; regProcLogger.error("Failed to process some DTOs", ar.cause()); } + // Move audit logging here to ensure it runs after async completion + performAuditLogging(isBatchSuccessful, description); });Note: The above is a partial fix illustration. A complete solution requires restructuring to either block until completion or move all post-processing logic into the callback.
249-249: Use parameterized type instead of rawFuture.Raw types bypass generic type checking and produce compiler warnings.
- List<Future> futures = new ArrayList<>(); + List<Future<Void>> futures = new ArrayList<>();
🧹 Nitpick comments (3)
registration-processor/workflow-engine/registration-processor-reprocessor/src/main/java/io/mosip/registration/processor/reprocessor/verticle/ReprocessorVerticle.java (3)
11-11: Prefer explicit imports over wildcard import.Using
io.vertx.core.*;imports the entire package. Consider importing only the specific classes used:Vertx,Promise,Future,CompositeFuture,AsyncResult.-import io.vertx.core.*; +import io.vertx.core.AsyncResult; +import io.vertx.core.CompositeFuture; +import io.vertx.core.Future; +import io.vertx.core.Promise; +import io.vertx.core.Vertx;
311-383: Add exception handling inprocessDTOfor better observability.If any call within
processDTOfails (e.g.,registrationStatusService.updateRegistrationStatusForWorkflowEngine), the exception propagates silently to the executeBlocking handler without logging which specific registration failed. Consider wrapping the method body in try-catch to log theregistrationIdon failure.private void processDTO(Map<String, Set<String>> reprocessRestartTriggerMap, InternalRegistrationStatusDto dto) { - boolean isTransactionSuccessful = false; LogDescription description = new LogDescription(); - String registrationId = dto.getRegistrationId(); + try { MessageDTO messageDTO = new MessageDTO(); messageDTO.setRid(registrationId); // ... rest of the method body ... + } catch (Exception e) { + regProcLogger.error(LoggerFileConstant.SESSIONID.toString(), + LoggerFileConstant.REGISTRATIONID.toString(), registrationId, + "Failed to process DTO: " + e.getMessage()); + throw e; // Re-throw to fail the future + } }
398-403: Inconsistent indentation in the else block.The closing braces have inconsistent indentation which affects readability.
} else { latestTransactionStatusCodeSet = new HashSet<String>(); setReprocessRestartTriggerMap(reprocessRestartTriggerMap, stageName, latestTransactionStatusCode, - latestTransactionStatusCodeSet); - } - } + latestTransactionStatusCodeSet); + } + }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
registration-processor/workflow-engine/registration-processor-reprocessor/src/main/java/io/mosip/registration/processor/reprocessor/verticle/ReprocessorVerticle.java(6 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
registration-processor/workflow-engine/registration-processor-reprocessor/src/main/java/io/mosip/registration/processor/reprocessor/verticle/ReprocessorVerticle.java (1)
registration-processor/workflow-engine/registration-processor-reprocessor/src/main/java/io/mosip/registration/processor/reprocessor/constants/ReprocessorConstants.java (1)
ReprocessorConstants(3-37)
| String moduleId = PlatformSuccessMessages.RPR_SENT_TO_REPROCESS_SUCCESS.getCode(); | ||
| String moduleName = ModuleName.RE_PROCESSOR.toString(); | ||
| registrationStatusService.updateRegistrationStatusForWorkflowEngine(dto, moduleId, moduleName); | ||
| String eventId = EventId.RPR_402.toString(); | ||
| String eventName = EventName.UPDATE.toString(); | ||
| String eventType = EventType.BUSINESS.toString(); | ||
|
|
||
| if (!isTransactionSuccessful) { | ||
| auditLogRequestBuilder.createAuditRequestBuilder(description.getMessage(), eventId, eventName, | ||
| eventType, moduleId, moduleName, registrationId); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
moduleId doesn't reflect failure when isTransactionSuccessful is false.
When reprocess count is exhausted (line 323 condition), isTransactionSuccessful remains false, but moduleId is always set to RPR_SENT_TO_REPROCESS_SUCCESS.getCode(). This is inconsistent with the failure state indicated by description.getCode() set at line 333.
- String moduleId = PlatformSuccessMessages.RPR_SENT_TO_REPROCESS_SUCCESS.getCode();
+ String moduleId = isTransactionSuccessful
+ ? PlatformSuccessMessages.RPR_SENT_TO_REPROCESS_SUCCESS.getCode()
+ : description.getCode();🤖 Prompt for AI Agents
In
registration-processor/workflow-engine/registration-processor-reprocessor/src/main/java/io/mosip/registration/processor/reprocessor/verticle/ReprocessorVerticle.java
around lines 372 to 382, moduleId is always set to
PlatformSuccessMessages.RPR_SENT_TO_REPROCESS_SUCCESS.getCode() even when
isTransactionSuccessful is false; update the logic so moduleId reflects the
failure state (use description.getCode() or an appropriate PlatformErrorMessages
code when isTransactionSuccessful == false) before calling
registrationStatusService.updateRegistrationStatusForWorkflowEngine, and ensure
moduleName/event values remain consistent for the failure branch so the audit
and status update use the correct failure moduleId.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
♻️ Duplicate comments (5)
registration-processor/workflow-engine/registration-processor-reprocessor/src/main/java/io/mosip/registration/processor/reprocessor/verticle/ReprocessorVerticle.java (5)
100-101: Instance fieldisBatchSuccessfulis not thread-safe.This field is shared across all invocations of
process(). If the scheduler triggers concurrent batch executions, different batches will interfere with each other's state. This should be a local variable (orAtomicBooleanif mutation from lambdas is needed) within theprocessmethod.
264-307: Critical race condition:finallyblock executes before async processing completes.
CompositeFuture.all(...).onComplete(...)is non-blocking. Thefinallyblock (lines 291-307) executes immediately after this code, before any async work completes. This means:
isBatchSuccessfulremainstrue(initial value) when audit log is written- The audit log will report success even for in-progress or failed batches
Move the audit logging inside the
onCompletecallback to ensure it executes after all processing completes.
371-382:moduleIddoesn't reflect failure whenisTransactionSuccessfulis false.When reprocess count is exhausted (line 323),
isTransactionSuccessfulremainsfalse, butmoduleIdis always set toRPR_SENT_TO_REPROCESS_SUCCESS.getCode(). This creates inconsistency with the failure state indescription.- String moduleId = PlatformSuccessMessages.RPR_SENT_TO_REPROCESS_SUCCESS.getCode(); + String moduleId = isTransactionSuccessful + ? PlatformSuccessMessages.RPR_SENT_TO_REPROCESS_SUCCESS.getCode() + : description.getCode();
249-249: Use parameterizedFuturetype.
List<Future>is a raw type. UseList<Future<Void>>for type safety and to avoid compiler warnings.- List<Future> futures = new ArrayList<>(); + List<Future<Void>> futures = new ArrayList<>();
251-263: Missing exception handling inexecuteBlocking- failures won't setisBatchSuccessfulto false.If
processDTOthrows an exception, the blocking code fails, butisBatchSuccessfulis never set tofalse. This means the finally block will incorrectly report success even when individual items fail.reprocessorDtoList.forEach(dto -> { Promise<Void> promise = Promise.promise(); vertx.executeBlocking(p -> { - processDTO(reprocessRestartTriggerMap, dto); - p.complete(); + try { + processDTO(reprocessRestartTriggerMap, dto); + p.complete(); + } catch (Exception e) { + regProcLogger.error("Error processing DTO: {}", dto.getRegistrationId(), e); + p.fail(e); + } }, false, res -> { if (res.succeeded()) { promise.complete(); } else { + isBatchSuccessful = false; promise.fail(res.cause()); } }); futures.add(promise.future()); });Note: Even with this fix, there's still a race condition with the
finallyblock (see separate comment). Consider moving audit logging into theonCompletecallback.
🧹 Nitpick comments (6)
registration-processor/workflow-engine/registration-processor-reprocessor/src/main/java/io/mosip/registration/processor/reprocessor/verticle/ReprocessorVerticle.java (1)
246-270: Parallel processing logic looks good, but ensure proper synchronization.The parallel processing implementation using
vertx.executeBlockingwithordered=falseenables concurrent processing. However, note that:
- The
isBatchSuccessfulfield being modified from multiple threads (if the result handler is called concurrently) requires synchronization or should be anAtomicBoolean.- Consider adding a count of successful vs failed items in the
onCompletecallback for better observability.+ AtomicBoolean batchSuccess = new AtomicBoolean(true); + AtomicInteger failedCount = new AtomicInteger(0); if (!CollectionUtils.isEmpty(reprocessorDtoList)) { - isBatchSuccessful = true; regProcLogger.info("Reprocess count - {}", reprocessorDtoList.size()); - List<Future> futures = new ArrayList<>(); + List<Future<Void>> futures = new ArrayList<>(); reprocessorDtoList.forEach(dto -> { Promise<Void> promise = Promise.promise(); vertx.executeBlocking(p -> { try { processDTO(reprocessRestartTriggerMap, dto); p.complete(); } catch (Exception e) { p.fail(e); } }, false, res -> { if (res.succeeded()) { promise.complete(); } else { + batchSuccess.set(false); + failedCount.incrementAndGet(); promise.fail(res.cause()); } }); futures.add(promise.future()); }); CompositeFuture.all(futures).onComplete(ar -> { if (ar.succeeded()) { regProcLogger.info("Successfully processed count - {}", futures.size()); } else { - regProcLogger.error("Failed to process some DTOs", ar.cause()); + regProcLogger.error("Failed to process {} DTOs", failedCount.get(), ar.cause()); } + // Move audit logging here to execute after all processing completes }); }registration-processor/registration-processor-info-storage-service/src/main/java/io/mosip/registration/processor/packet/storage/utils/Utility.java (3)
40-41: Redundant static import.Line 41 imports
parseUTCToLocalDateTimespecifically, but line 40 already imports all static members fromDateUtilsvia the wildcard import. Remove the redundant line.import static io.mosip.kernel.core.util.DateUtils.*; -import static io.mosip.kernel.core.util.DateUtils.parseUTCToLocalDateTime;
696-701: Simplify HashMap initialization.The
forEachwith lambda for copying entries is unnecessarily verbose. Use the copy constructor instead.// Copy "others" metadata if present if(birs.getOthers() != null) { - HashMap<String, String> others = new HashMap<>(); - birs.getOthers().entrySet().forEach(e -> { - others.put(e.getKey(), e.getValue()); - }); + HashMap<String, String> others = new HashMap<>(birs.getOthers()); biometricRecord.setOthers(others); }
803-815: Consider narrowing exception handling.Catching
Exceptionhides the distinction between expected conditions (e.g., UIN not found) and unexpected errors (e.g., network failures). Consider catching specific exception types or at least preserving the original exception in the thrownBiometricClassificationException.public boolean allBiometricHaveException(String rid, String registrationType, ProviderStageName stageName) throws BiometricClassificationException { try { String uin = getUIn(rid, registrationType, stageName); BiometricRecord bm = getBiometricRecordfromIdrepo(uin, rid); return allBiometricHaveException(bm.getSegments(), rid); - } catch (Exception e) { + } catch (IOException | ApisResourceAccessException | PacketManagerException | JsonProcessingException e) { regProcLogger.error(LoggerFileConstant.SESSIONID.toString(), LoggerFileConstant.REGISTRATIONID.toString(), rid, "utility::allBiometricHaveException():: Error while classifying biometric exceptions", e); throw new BiometricClassificationException( PlatformErrorMessages.RPR_BDD_UNABLE_TO_FETCH_BIOMETRIC_INFO.getCode(), - PlatformErrorMessages.RPR_BDD_UNABLE_TO_FETCH_BIOMETRIC_INFO.getMessage()); + PlatformErrorMessages.RPR_BDD_UNABLE_TO_FETCH_BIOMETRIC_INFO.getMessage(), e); } }registration-processor/core-processor/registration-processor-uin-generator-stage/src/test/java/io/mosip/registration/processor/stages/uigenerator/UinGeneratorStageTest.java (1)
381-381: PacketCreatedOn tests correctly cover NEW/UPDATE/LOST scenarios; consider tightening expectationsThe new stubbing of
utility.retrieveCreatedDateFromPacket(...)and the added tests aroundupdatePacketCreatedOnInDemographicIdentitynicely exercise:
- null mapped-field key,
- null created-date value,
- and skipping for
LOSTregistrations, matching the intended scope forpacketCreatedOn.If you want the tests to be more precise, you could:
- Stub
retrieveCreatedDateFromPacketwith test-specific values instead of a single global constant, or- Assert that
retrieveCreatedDateFromPacketis invoked with the expectedregistrationIdandregistrationTypefor NEW vs UPDATE.This would better align the “metaInfo present” test names with what’s actually being asserted, but is not strictly required for correctness.
Also applies to: 2673-2674, 2685-2707, 2710-2732, 2735-2754, 2757-2776
registration-processor/registration-processor-info-storage-service/src/test/java/io/mosip/registration/processor/packet/storage/utils/UtilityTest.java (1)
63-69: Instance‑based Utility tests look good; consider safer stubbing for the spySwitching this suite to exercise the concrete
Utilityinstance gives you strong coverage over the new helper methods and their edge cases (date parsing, age limits/buffers, packetCreated derivation, biometric exception classification, etc.), which is valuable.One small robustness nit: several tests stub methods on the
@Spyutilityusing thewhen(utility.someMethod(...)).thenReturn(...)form. With spies this pattern can invoke the real implementation during stubbing, which can be fragile if the method gains more dependencies later.Where you are stubbing
utility’s own methods (e.g.,getUIn,getIdVidMetadata, etc.), you may want to switch to thedoReturn(...).when(utility).someMethod(...)style:- when(utility.getUIn(any(), any(), any())).thenReturn("123456789012"); + doReturn("123456789012").when(utility).getUIn(any(), any(), any());Same idea applies to other internal
utilitymethods you mock out. This is optional but can make the tests more resilient to future refactors.Also applies to: 82-85, 100-105, 188-221, 239-266, 300-315, 318-335, 345-365, 367-396, 398-427, 429-455, 485-579, 595-615, 619-622, 625-636, 650-659, 661-667, 670-695, 701-772
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (8)
registration-processor/core-processor/registration-processor-bio-dedupe-stage/src/main/java/io/mosip/registration/processor/biodedupe/stage/BioDedupeProcessor.java(1 hunks)registration-processor/core-processor/registration-processor-bio-dedupe-stage/src/test/java/io/mosip/registration/processor/biodedupe/stage/BioDedupeProcessorTest.java(5 hunks)registration-processor/core-processor/registration-processor-uin-generator-stage/src/main/java/io/mosip/registration/processor/stages/uingenerator/stage/UinGeneratorStage.java(2 hunks)registration-processor/core-processor/registration-processor-uin-generator-stage/src/test/java/io/mosip/registration/processor/stages/uigenerator/UinGeneratorStageTest.java(6 hunks)registration-processor/registration-processor-info-storage-service/src/main/java/io/mosip/registration/processor/packet/storage/utils/Utilities.java(3 hunks)registration-processor/registration-processor-info-storage-service/src/main/java/io/mosip/registration/processor/packet/storage/utils/Utility.java(4 hunks)registration-processor/registration-processor-info-storage-service/src/test/java/io/mosip/registration/processor/packet/storage/utils/UtilityTest.java(34 hunks)registration-processor/workflow-engine/registration-processor-reprocessor/src/main/java/io/mosip/registration/processor/reprocessor/verticle/ReprocessorVerticle.java(6 hunks)
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-11-18T09:19:25.334Z
Learnt from: ashok-ksharma
Repo: mosip/registration PR: 2209
File: registration-processor/core-processor/registration-processor-uin-generator-stage/src/main/java/io/mosip/registration/processor/stages/uingenerator/stage/UinGeneratorStage.java:1168-1177
Timestamp: 2025-11-18T09:19:25.334Z
Learning: In the MOSIP registration processor, the `updatePacketCreatedOnInDemographicIdentity` method in `UinGeneratorStage.java` is intentionally limited to `RegistrationType.NEW` and `RegistrationType.UPDATE` only, because these are the registration types that update biometrics. `RES_UPDATE` and Mapped `UPDATE` do not update biometrics, so they should not trigger the `packetCreatedOn` update logic.
Applied to files:
registration-processor/core-processor/registration-processor-uin-generator-stage/src/test/java/io/mosip/registration/processor/stages/uigenerator/UinGeneratorStageTest.javaregistration-processor/core-processor/registration-processor-bio-dedupe-stage/src/main/java/io/mosip/registration/processor/biodedupe/stage/BioDedupeProcessor.javaregistration-processor/core-processor/registration-processor-bio-dedupe-stage/src/test/java/io/mosip/registration/processor/biodedupe/stage/BioDedupeProcessorTest.javaregistration-processor/registration-processor-info-storage-service/src/test/java/io/mosip/registration/processor/packet/storage/utils/UtilityTest.javaregistration-processor/core-processor/registration-processor-uin-generator-stage/src/main/java/io/mosip/registration/processor/stages/uingenerator/stage/UinGeneratorStage.javaregistration-processor/registration-processor-info-storage-service/src/main/java/io/mosip/registration/processor/packet/storage/utils/Utilities.javaregistration-processor/registration-processor-info-storage-service/src/main/java/io/mosip/registration/processor/packet/storage/utils/Utility.java
🧬 Code graph analysis (2)
registration-processor/workflow-engine/registration-processor-reprocessor/src/main/java/io/mosip/registration/processor/reprocessor/verticle/ReprocessorVerticle.java (1)
registration-processor/workflow-engine/registration-processor-reprocessor/src/main/java/io/mosip/registration/processor/reprocessor/constants/ReprocessorConstants.java (1)
ReprocessorConstants(3-37)
registration-processor/registration-processor-info-storage-service/src/main/java/io/mosip/registration/processor/packet/storage/utils/Utility.java (3)
registration-processor/registration-processor-core/src/main/java/io/mosip/registration/processor/core/util/JsonUtil.java (1)
JsonUtil(34-317)registration-processor/registration-processor-core/src/main/java/io/mosip/registration/processor/core/constant/MappingJsonConstants.java (1)
MappingJsonConstants(3-50)registration-processor/registration-processor-core/src/main/java/io/mosip/registration/processor/core/constant/JsonConstant.java (1)
JsonConstant(8-270)
🔇 Additional comments (9)
registration-processor/registration-processor-info-storage-service/src/main/java/io/mosip/registration/processor/packet/storage/utils/Utilities.java (1)
5-8: LGTM - Formatting changes only.The changes in this file are limited to import reorganization and minor whitespace adjustments in method signatures. No functional changes.
Also applies to: 541-541, 726-726
registration-processor/registration-processor-info-storage-service/src/main/java/io/mosip/registration/processor/packet/storage/utils/Utility.java (5)
58-112: Well-documented configuration properties.The new dependencies and configuration properties are properly injected with clear Javadoc explaining their purpose and default values.
259-297: LGTM - Clear implementation with proper null handling.The method properly validates inputs and throws descriptive exceptions when required data is unavailable. The fallback strategy in
resolveLastPacketProcessedDateprovides multiple ways to determine the packet date.
618-648: LGTM - Fallback pattern with graceful degradation.The method is used as part of a multi-strategy date resolution. Returning
nullon parse errors allows the caller to proceed to the next fallback strategy. Exception details are logged for debugging.
411-417: LGTM - Helper methods are well-implemented.These utility methods have proper null checks, clear logging, and follow consistent patterns for date extraction and validation.
Also applies to: 427-455, 465-480, 505-532, 596-609, 829-847
491-497: LGTM.Simple delegation to
IdRepoService. Callers appropriately handle the nullable return value.registration-processor/core-processor/registration-processor-bio-dedupe-stage/src/test/java/io/mosip/registration/processor/biodedupe/stage/BioDedupeProcessorTest.java (1)
741-754: Updated stubbing againstUtilityaligns with new BioDedupeProcessor behaviorThe switch from
Utilitiesto the injectedUtilityin these tests is consistent with the production changes, and the scenarios (MANUAL_VERIFICATION vs REJECTED, all-biometric-exception, infant, andBiometricClassificationException) remain well covered. No issues from a test-behavior standpoint.Also applies to: 782-795, 820-833, 858-867, 940-947
registration-processor/core-processor/registration-processor-bio-dedupe-stage/src/main/java/io/mosip/registration/processor/biodedupe/stage/BioDedupeProcessor.java (1)
120-122: Utility injection/use in post‑ABIS logic looks consistent and safeThe introduction of the autowired
Utilityand its use inpostAbisIdentification(wasInfantWhenLastPacketProcessed/allBiometricHaveException) preserves the existing decision flow for update packets with no ABIS match, while routing the calls through the shared helper. Exceptions from these methods are already handled at theprocess()level, so there’s no new uncaught path.Also applies to: 469-507
registration-processor/core-processor/registration-processor-uin-generator-stage/src/main/java/io/mosip/registration/processor/stages/uingenerator/stage/UinGeneratorStage.java (1)
190-197: Verify logger call signature and confirm placeholder substitution behaviorThe registration-type guard in
updatePacketCreatedOnInDemographicIdentityis correct—it intentionally limits updates toNEWandUPDATEonly, excludingRES_UPDATEand mapped UPDATE types that do not update biometrics.However, the
regProcLogger.info(...)call at line 1174 uses a{}placeholder with five arguments:regProcLogger.info(LoggerFileConstant.SESSIONID.toString(), LoggerFileConstant.REGISTRATIONID.toString(), registrationId, "Skipping update of packetCreatedOn. registrationType: {}", object.getReg_type());MOSIP's kernel logger uses a custom wrapper for structured logging (sessionId, registrationId). Verify whether this logger implementation supports SLF4J-style
{}placeholder substitution with varargs. If not, use string concatenation instead:regProcLogger.info(LoggerFileConstant.SESSIONID.toString(), LoggerFileConstant.REGISTRATIONID.toString(), registrationId, "Skipping update of packetCreatedOn. registrationType: " + object.getReg_type());
|
|
||
| // Check if date is older than 100 years | ||
| LocalDateTime hundredYearsAgo = now.minusYears(200); | ||
| if (before(ldt, hundredYearsAgo)) { | ||
| regProcLogger.error("Date is older than 200 years : {}", ldt); | ||
| return null; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Inconsistent comment: says 100 years but code checks 200 years.
The comment states "Check if date is older than 100 years" but the code actually checks for 200 years. Update the comment to match the implementation.
- // Check if date is older than 100 years
+ // Check if date is older than 200 years
LocalDateTime hundredYearsAgo = now.minusYears(200);Also consider renaming hundredYearsAgo to twoHundredYearsAgo for consistency.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| // Check if date is older than 100 years | |
| LocalDateTime hundredYearsAgo = now.minusYears(200); | |
| if (before(ldt, hundredYearsAgo)) { | |
| regProcLogger.error("Date is older than 200 years : {}", ldt); | |
| return null; | |
| } | |
| // Check if date is older than 200 years | |
| LocalDateTime hundredYearsAgo = now.minusYears(200); | |
| if (before(ldt, hundredYearsAgo)) { | |
| regProcLogger.error("Date is older than 200 years : {}", ldt); | |
| return null; | |
| } |
🤖 Prompt for AI Agents
In
registration-processor/registration-processor-info-storage-service/src/main/java/io/mosip/registration/processor/packet/storage/utils/Utility.java
around lines 574 to 580, the inline comment incorrectly says "100 years" while
the code checks for 200 years; change the comment to state "Check if date is
older than 200 years" and rename the local variable hundredYearsAgo to
twoHundredYearsAgo (and update all its references) so the identifier matches the
check and maintain consistency with the existing log message.
| for (BIR bir : birs) { | ||
| BiometricType type = bir.getBdbInfo().getType().get(0); | ||
| boolean isFaceOrExceptionPhoto = type == BiometricType.FACE || type == BiometricType.EXCEPTION_PHOTO; | ||
|
|
||
| regProcLogger.debug(LoggerFileConstant.SESSIONID.toString(), LoggerFileConstant.REGISTRATIONID.toString(), rid, | ||
| "utility::allBiometricHaveException():: Checking biometric type: {} ",type); | ||
|
|
||
| if (hasOthers) { | ||
| if (!isFaceOrExceptionPhoto) { | ||
| String exceptionValue = bir.getOthers().get(EXCEPTION); | ||
| regProcLogger.debug(LoggerFileConstant.SESSIONID.toString(), LoggerFileConstant.REGISTRATIONID.toString(), rid, | ||
| "utility::allBiometricHaveException():: Biometric type: {}, exceptionValue: {}", type, exceptionValue); | ||
|
|
||
| if (exceptionValue == null || !exceptionValue.equalsIgnoreCase(TRUE)) { | ||
| regProcLogger.info(LoggerFileConstant.SESSIONID.toString(), LoggerFileConstant.REGISTRATIONID.toString(), rid, | ||
| "utility::allBiometricHaveException():: Biometric type: {} does not have exception", type); | ||
| return false; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potential NullPointerException risks in biometric processing.
Two NPE concerns:
-
Line 747:
bir.getBdbInfo().getType().get(0)- No null checks. IfgetBdbInfo()is null,getType()is null, or the list is empty, this will throw. -
Line 755:
bir.getOthers().get(EXCEPTION)-hasOtherschecks if any BIR in the list has others, but this specificbirmight havenullforgetOthers().
for (BIR bir : birs) {
- BiometricType type = bir.getBdbInfo().getType().get(0);
+ if (bir.getBdbInfo() == null || bir.getBdbInfo().getType() == null || bir.getBdbInfo().getType().isEmpty()) {
+ regProcLogger.warn(LoggerFileConstant.SESSIONID.toString(), LoggerFileConstant.REGISTRATIONID.toString(), rid,
+ "utility::allBiometricHaveException():: Skipping BIR with missing BdbInfo or Type");
+ continue;
+ }
+ BiometricType type = bir.getBdbInfo().getType().get(0);
boolean isFaceOrExceptionPhoto = type == BiometricType.FACE || type == BiometricType.EXCEPTION_PHOTO;
// ... later in the code
if (hasOthers) {
if (!isFaceOrExceptionPhoto) {
- String exceptionValue = bir.getOthers().get(EXCEPTION);
+ Map<String, String> birOthers = bir.getOthers();
+ String exceptionValue = (birOthers != null) ? birOthers.get(EXCEPTION) : null;🤖 Prompt for AI Agents
In
registration-processor/registration-processor-info-storage-service/src/main/java/io/mosip/registration/processor/packet/storage/utils/Utility.java
around lines 746 to 763, the code can throw NPEs when bir.getBdbInfo(),
bir.getBdbInfo().getType() is null or empty, or when bir.getOthers() is null for
the current bir; add defensive null/empty checks before dereferencing: first
verify bir!=null && bir.getBdbInfo()!=null && bir.getBdbInfo().getType()!=null
&& !bir.getBdbInfo().getType().isEmpty() before accessing get(0) and skip or
treat missing type as not matching FACE/EXCEPTION_PHOTO; second, ensure
bir.getOthers()!=null and bir.getOthers().containsKey(EXCEPTION) before calling
get(EXCEPTION) and treat absent exception entries as not TRUE (log and return
false per current behavior). Update debug/info logs to reflect when values are
missing to aid tracing.
| String eventId = EventId.RPR_402.toString(); | ||
| String eventName = EventName.UPDATE.toString(); | ||
| String eventType = EventType.BUSINESS.toString(); | ||
|
|
||
| if (!isTransactionSuccessful) { | ||
| auditLogRequestBuilder.createAuditRequestBuilder(description.getMessage(), eventId, eventName, | ||
| eventType, moduleId, moduleName, registrationId); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Audit event codes are inconsistent with failure state.
When isTransactionSuccessful is false, the audit is logged with success event codes (RPR_402, UPDATE, BUSINESS) instead of failure codes. This should mirror the logic in the batch-level finally block.
- String eventId = EventId.RPR_402.toString();
- String eventName = EventName.UPDATE.toString();
- String eventType = EventType.BUSINESS.toString();
+ String eventId = isTransactionSuccessful ? EventId.RPR_402.toString() : EventId.RPR_405.toString();
+ String eventName = isTransactionSuccessful ? EventName.UPDATE.toString() : EventName.EXCEPTION.toString();
+ String eventType = isTransactionSuccessful ? EventType.BUSINESS.toString() : EventType.SYSTEM.toString();
if (!isTransactionSuccessful) {
auditLogRequestBuilder.createAuditRequestBuilder(description.getMessage(), eventId, eventName,
eventType, moduleId, moduleName, registrationId);
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| String eventId = EventId.RPR_402.toString(); | |
| String eventName = EventName.UPDATE.toString(); | |
| String eventType = EventType.BUSINESS.toString(); | |
| if (!isTransactionSuccessful) { | |
| auditLogRequestBuilder.createAuditRequestBuilder(description.getMessage(), eventId, eventName, | |
| eventType, moduleId, moduleName, registrationId); | |
| } | |
| String eventId = isTransactionSuccessful ? EventId.RPR_402.toString() : EventId.RPR_405.toString(); | |
| String eventName = isTransactionSuccessful ? EventName.UPDATE.toString() : EventName.EXCEPTION.toString(); | |
| String eventType = isTransactionSuccessful ? EventType.BUSINESS.toString() : EventType.SYSTEM.toString(); | |
| if (!isTransactionSuccessful) { | |
| auditLogRequestBuilder.createAuditRequestBuilder(description.getMessage(), eventId, eventName, | |
| eventType, moduleId, moduleName, registrationId); | |
| } |
🤖 Prompt for AI Agents
In
registration-processor/workflow-engine/registration-processor-reprocessor/src/main/java/io/mosip/registration/processor/reprocessor/verticle/ReprocessorVerticle.java
around lines 375 to 382, the audit uses success event constants when
isTransactionSuccessful is false; replace the success constants
(EventId.RPR_402, EventName.UPDATE, EventType.BUSINESS) with the corresponding
failure constants used in the batch-level finally block (the failure
EventId/EventName/EventType defined in the same enums) so the
auditLogRequestBuilder.createAuditRequestBuilder call logs the failure event
codes and names instead of success ones.
Summary by CodeRabbit
Performance Improvements
Bug Fixes
Improvements
Refactor
Tests
✏️ Tip: You can customize this high-level summary in your review settings.