-
Notifications
You must be signed in to change notification settings - Fork 3
DMP-4312: Dets Clean up automated Task #3216
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
a88bc73
cad11b7
d02df8d
beb8099
7097716
f61bc10
fd5930d
7b169b1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Large diffs are not rendered by default.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,8 @@ | ||
| package uk.gov.hmcts.darts.arm.service; | ||
|
|
||
| import uk.gov.hmcts.darts.task.config.CleanUpDetsDataAutomatedTaskConfig; | ||
|
|
||
| @FunctionalInterface | ||
| public interface CleanUpDetsDataProcessor { | ||
| void processCleanUpDetsData(int batchSize, CleanUpDetsDataAutomatedTaskConfig minimumStoredAge); | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,143 @@ | ||
| package uk.gov.hmcts.darts.arm.service.impl; | ||
|
|
||
| import lombok.AllArgsConstructor; | ||
| import lombok.Getter; | ||
| import lombok.RequiredArgsConstructor; | ||
| import lombok.Setter; | ||
| import lombok.extern.slf4j.Slf4j; | ||
| import org.apache.commons.collections4.CollectionUtils; | ||
| import org.apache.commons.collections4.ListUtils; | ||
| import org.springframework.stereotype.Component; | ||
| import org.springframework.stereotype.Service; | ||
| import org.springframework.transaction.annotation.Propagation; | ||
| import org.springframework.transaction.annotation.Transactional; | ||
| import uk.gov.hmcts.darts.arm.service.CleanUpDetsDataProcessor; | ||
| import uk.gov.hmcts.darts.common.repository.ExternalObjectDirectoryRepository; | ||
| import uk.gov.hmcts.darts.common.repository.ObjectStateRecordRepository; | ||
| import uk.gov.hmcts.darts.dets.service.DetsApiService; | ||
| import uk.gov.hmcts.darts.task.config.CleanUpDetsDataAutomatedTaskConfig; | ||
| import uk.gov.hmcts.darts.util.AsyncUtil; | ||
|
|
||
| import java.time.Clock; | ||
| import java.time.OffsetDateTime; | ||
| import java.util.ArrayList; | ||
| import java.util.List; | ||
| import java.util.concurrent.Callable; | ||
|
|
||
| @Slf4j | ||
| @Service | ||
| @RequiredArgsConstructor | ||
| public class CleanUpDetsDataProcessorImpl implements CleanUpDetsDataProcessor { | ||
|
|
||
| private final CleanUpDetsDataBatchProcessor cleanUpDetsDataBatchProcessor; | ||
| private final Clock clock; | ||
|
|
||
| @Override | ||
| //Required for async batch processing | ||
| @SuppressWarnings({"PMD.DoNotUseThreads"}) | ||
| public void processCleanUpDetsData(int batchSize, CleanUpDetsDataAutomatedTaskConfig config) { | ||
| log.info("Processing clean up of DETS data with batch size: {}", batchSize); | ||
|
|
||
| OffsetDateTime minimumStoredAge = OffsetDateTime.now(clock).minus(config.getMinimumStoredAge()); | ||
| int chunkSize = config.getChunkSize(); | ||
|
|
||
| int totalProcessed = 0; | ||
|
|
||
| while (totalProcessed < batchSize && chunkSize > 0) { | ||
| log.info("Processing clean up of DETS data with chunk size: {}", chunkSize); | ||
|
|
||
| List<CleanUpDetsDataProcessorImpl.CleanUpDetsProcedureResponse> eodIdsToCleanUp = | ||
| cleanUpDetsDataBatchProcessor.callDetsCleanUpStoredProcedure(chunkSize, minimumStoredAge); | ||
|
|
||
| if (eodIdsToCleanUp.isEmpty()) { | ||
| log.info("No more DETS data to clean up. Ending process."); | ||
| break; | ||
| } | ||
|
|
||
| List<List<CleanUpDetsDataProcessorImpl.CleanUpDetsProcedureResponse>> batchesToDeleteBlobStoreRecordFor = | ||
| ListUtils.partition(eodIdsToCleanUp, config.getChunkSize() / config.getThreads()); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This calculation "outconfig.getChunkSize() / config.getThreads()" should not be passed as a parameter to ensure there are no issues with the value returned |
||
|
|
||
| List<Callable<Void>> tasks = batchesToDeleteBlobStoreRecordFor | ||
| .stream() | ||
| .map(eodsForBatch -> (Callable<Void>) () -> { | ||
| cleanUpDetsDataBatchProcessor.process(eodsForBatch); | ||
| return null; | ||
| }) | ||
| .toList(); | ||
|
|
||
| try { | ||
| AsyncUtil.invokeAllAwaitTermination(tasks, config); | ||
| } catch (InterruptedException e) { | ||
| log.error("Clean up dets data batch processing interrupted", e); | ||
| Thread.currentThread().interrupt(); | ||
| } catch (Exception e) { | ||
| log.error("Clean up dets data unexpected exception", e); | ||
| } | ||
| //Update total processed count and adjust chunk size for next iteration if needed | ||
| totalProcessed += eodIdsToCleanUp.size(); | ||
| //Ensure we do not exceed the batch size in the next iteration | ||
| //Takes into account the possibility that the procedure may return more records than requested | ||
| if (totalProcessed + chunkSize > batchSize) { | ||
| chunkSize = batchSize - totalProcessed; | ||
| } | ||
| log.info("Processed batch of DETS data clean up. Total processed so far: {}. Batch size: {}", totalProcessed, eodIdsToCleanUp.size()); | ||
| } | ||
| log.info("Completed processing clean up of DETS data. Total processed: {}", totalProcessed); | ||
| } | ||
|
|
||
| @Component | ||
| @RequiredArgsConstructor | ||
| public static class CleanUpDetsDataBatchProcessor { | ||
|
|
||
| private final ExternalObjectDirectoryRepository externalObjectDirectoryRepository; | ||
| private final ObjectStateRecordRepository objectStateRecordRepository; | ||
| private final DetsApiService detsApiService; | ||
|
|
||
| @Transactional(propagation = Propagation.REQUIRES_NEW) | ||
| public void process(List<CleanUpDetsDataProcessorImpl.CleanUpDetsProcedureResponse> eodsCleanedUp) { | ||
| List<Long> objectStateRecordsForDetsRecordsCleanedUpSuccessfully = new ArrayList<>(); | ||
| List<Long> objectStateRecordsForDetsRecordsFailedToCleanUp = new ArrayList<>(); | ||
|
|
||
| if (eodsCleanedUp.isEmpty()) { | ||
| return; | ||
| } | ||
|
|
||
| for (CleanUpDetsProcedureResponse response : eodsCleanedUp) { | ||
| try { | ||
| log.debug("Processing clean up response for EOD ID: {}, Location: {}", response.getOsrUuid(), response.getDetsLocation()); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We have the functionality for feature flag logging, can you please change these debug messages to be feature flag controlled so we can turn them on and off when required. I will find an example of how this is currently done |
||
| if (detsApiService.deleteBlobDataFromContainer(response.getDetsLocation())) { | ||
| log.debug("Successfully deleted DETS blob for EOD ID: {}, Location: {}", response.getOsrUuid(), response.getDetsLocation()); | ||
| objectStateRecordsForDetsRecordsCleanedUpSuccessfully.add(response.getOsrUuid()); | ||
| continue; | ||
| } else { | ||
| log.error("Failed to delete DETS blob for EOD ID: {}, Location: {}. Blob may not exist or deletion failed.", | ||
| response.getOsrUuid(), response.getDetsLocation()); | ||
| } | ||
| } catch (Exception exception) { | ||
| log.error("Failed to delete DETS blob for EOD Location: {}, object state record id: {}.", | ||
| response.getDetsLocation(), response.getOsrUuid(), exception); | ||
| } | ||
| objectStateRecordsForDetsRecordsFailedToCleanUp.add(response.getOsrUuid()); | ||
| } | ||
| objectStateRecordRepository.markDetsCleanupStatusAsComplete(objectStateRecordsForDetsRecordsCleanedUpSuccessfully); | ||
| log.info("Marked object state records as clean up complete for EOD IDs: {}", objectStateRecordsForDetsRecordsCleanedUpSuccessfully); | ||
|
|
||
| if (CollectionUtils.isNotEmpty(objectStateRecordsForDetsRecordsFailedToCleanUp)) { | ||
| log.error("Dets clean up failed for Object state record Ids: {}", objectStateRecordsForDetsRecordsFailedToCleanUp); | ||
| } | ||
| } | ||
|
|
||
| @Transactional(propagation = Propagation.REQUIRES_NEW) | ||
| public List<CleanUpDetsDataProcessorImpl.CleanUpDetsProcedureResponse> callDetsCleanUpStoredProcedure(int chunkSize, OffsetDateTime minimumStoredAge) { | ||
| return objectStateRecordRepository.cleanUpDetsDataProcedure(chunkSize, minimumStoredAge); | ||
| } | ||
| } | ||
|
|
||
| @AllArgsConstructor | ||
| @Getter | ||
| @Setter | ||
| public static class CleanUpDetsProcedureResponse { | ||
| private Long osrUuid; | ||
| private String detsLocation; | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,10 +1,27 @@ | ||
| package uk.gov.hmcts.darts.common.repository; | ||
|
|
||
| import org.springframework.data.jpa.repository.JpaRepository; | ||
| import org.springframework.data.jpa.repository.Modifying; | ||
| import org.springframework.data.jpa.repository.Query; | ||
| import org.springframework.data.repository.query.Param; | ||
| import uk.gov.hmcts.darts.arm.service.impl.CleanUpDetsDataProcessorImpl; | ||
| import uk.gov.hmcts.darts.common.entity.ObjectStateRecordEntity; | ||
|
|
||
| import java.time.OffsetDateTime; | ||
| import java.util.List; | ||
| import java.util.Optional; | ||
|
|
||
| public interface ObjectStateRecordRepository extends JpaRepository<ObjectStateRecordEntity, Long> { | ||
| Optional<ObjectStateRecordEntity> findByArmEodId(long armEodId); | ||
|
|
||
| @Modifying | ||
| @Query("UPDATE ObjectStateRecordEntity o SET o.flagFileDetsCleanupStatus = true where o.uuid in :uuids") | ||
| void markDetsCleanupStatusAsComplete(List<Long> uuids); | ||
|
|
||
|
|
||
| @Query(value = "SELECT osr_uuid AS osrUuid, dets_location AS detsLocation " + | ||
| "FROM dets_cleanup_eod_osr_rows(:limit, :last_modified_before_ts)", nativeQuery = true) | ||
| List<CleanUpDetsDataProcessorImpl.CleanUpDetsProcedureResponse> cleanUpDetsDataProcedure( | ||
| @Param("limit") Integer limit, @Param("last_modified_before_ts") OffsetDateTime lastModifiedBefore); | ||
|
|
||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,18 @@ | ||
| package uk.gov.hmcts.darts.task.config; | ||
|
|
||
| import lombok.Getter; | ||
| import lombok.Setter; | ||
| import org.springframework.boot.context.properties.ConfigurationProperties; | ||
| import org.springframework.context.annotation.Configuration; | ||
|
|
||
| import java.time.Duration; | ||
|
|
||
| @ConfigurationProperties("darts.automated.task.clean-up-dets-data") | ||
| @Getter | ||
| @Setter | ||
| @Configuration | ||
| public class CleanUpDetsDataAutomatedTaskConfig extends AbstractAsyncAutomatedTaskConfig { | ||
|
|
||
| private Duration minimumStoredAge; | ||
| private int chunkSize; | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,42 @@ | ||
| package uk.gov.hmcts.darts.task.runner.impl; | ||
|
|
||
| import lombok.extern.slf4j.Slf4j; | ||
| import org.springframework.beans.factory.annotation.Autowired; | ||
| import org.springframework.stereotype.Component; | ||
| import uk.gov.hmcts.darts.arm.service.CleanUpDetsDataProcessor; | ||
| import uk.gov.hmcts.darts.common.repository.AutomatedTaskRepository; | ||
| import uk.gov.hmcts.darts.log.api.LogApi; | ||
| import uk.gov.hmcts.darts.task.api.AutomatedTaskName; | ||
| import uk.gov.hmcts.darts.task.config.CleanUpDetsDataAutomatedTaskConfig; | ||
| import uk.gov.hmcts.darts.task.runner.AutoloadingManualTask; | ||
| import uk.gov.hmcts.darts.task.service.LockService; | ||
|
|
||
| import static uk.gov.hmcts.darts.task.api.AutomatedTaskName.CLEAN_UP_DETS_DATA; | ||
|
|
||
| @Slf4j | ||
| @Component | ||
| public class CleanUpDetsDataAutomatedTask | ||
| extends AbstractLockableAutomatedTask<CleanUpDetsDataAutomatedTaskConfig> | ||
| implements AutoloadingManualTask { | ||
|
|
||
| private final CleanUpDetsDataProcessor cleanUpDetsDataProcessor; | ||
|
|
||
| @Autowired | ||
| public CleanUpDetsDataAutomatedTask(AutomatedTaskRepository automatedTaskRepository, | ||
| CleanUpDetsDataAutomatedTaskConfig automatedTaskConfigurationProperties, | ||
| CleanUpDetsDataProcessor processor, | ||
| LogApi logApi, LockService lockService) { | ||
| super(automatedTaskRepository, automatedTaskConfigurationProperties, logApi, lockService); | ||
| this.cleanUpDetsDataProcessor = processor; | ||
| } | ||
|
|
||
| @Override | ||
| public AutomatedTaskName getAutomatedTaskName() { | ||
| return CLEAN_UP_DETS_DATA; | ||
| } | ||
|
|
||
| @Override | ||
| protected void runTask() { | ||
| cleanUpDetsDataProcessor.processCleanUpDetsData(getAutomatedTaskBatchSize(), getConfig()); | ||
| } | ||
| } |
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. migrations need to be updated as we have had various other flyway migration merged |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,10 @@ | ||
| INSERT INTO darts.automated_task (aut_id, task_name, task_description, cron_expression, cron_editable, batch_size, | ||
| created_ts, created_by, last_modified_ts, last_modified_by, task_enabled) | ||
| VALUES (36, 'CleanUpDetsData', 'Cleans up Dets files that have successfully been stored in ARM', '0 24 0-6,19-23 ? * *', true, 100_000, | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this should not use a fixed value 36 for id but should use nextval('aut_seq') as we have added more automated tasks |
||
| current_timestamp, 0, current_timestamp, 0, false); | ||
|
|
||
| INSERT INTO user_account (usr_id, user_name, user_email_address, description, created_ts, last_modified_ts, last_modified_by, created_by, is_system_user, | ||
| is_active, user_full_name) | ||
|
|
||
| VALUES (-51, 'systemCleanUpDetsDataAutomatedTask', 'systemCleanUpDetsDataAutomatedTask@hmcts.net', 'systemCleanUpDetsDataAutomatedTask', | ||
| '2024-01-01 00:00:00+00', '2024-01-01 00:00:00+00', 0, 0, true, true, 'systemCleanUpDetsDataAutomatedTask'); | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the callDetsCleanUpStoredProcedure throws an exception say because there are duplicate eods, then the whole job finishes. Should there be a try catch inside the while loop so the next chunk can be processed?