diff --git a/pom.xml b/pom.xml index 0def54a..a08f8bb 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ com.uid2 uid2-optout - 2.9.0-685b9893b7 + 2.9.6-SNAPSHOT uid2-optout https://github.com/IABTechLab/uid2-optout diff --git a/src/main/java/com/uid2/optout/vertx/OptOutSender.java b/src/main/java/com/uid2/optout/vertx/OptOutSender.java index 81e08c8..fd4cb0d 100644 --- a/src/main/java/com/uid2/optout/vertx/OptOutSender.java +++ b/src/main/java/com/uid2/optout/vertx/OptOutSender.java @@ -29,6 +29,7 @@ import java.time.temporal.ChronoUnit; import java.util.*; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -404,11 +405,30 @@ private void deltaReplay(Promise promise, OptOutCollection store, List lastOp = Future.succeededFuture(); + var ref = new Object() { + Future lastOp = Future.succeededFuture(); + }; + + List> sendOps = new ArrayList>(); + + long periodicId = this.vertx.setPeriodic(1000, l -> { + int completeSendOps = 0; + for (Future sendOp : sendOps) { + if (sendOp.isComplete()) { + completeSendOps++; + } + } + + this.logger.info("Completed " + completeSendOps + " send ops out of " + sendOps.size()); + this.logger.info("LastOp is complete: " + ref.lastOp.isComplete()); + }); + for (int i = 0; i < store.size(); ++i) { final OptOutEntry entry = store.get(i); - lastOp = lastOp.compose(ar -> { + ref.lastOp = ref.lastOp.compose(ar -> { Future sendOp = this.remotePartner.send(entry); + sendOp.timeout(1, TimeUnit.MINUTES); + sendOps.add(sendOp); return sendOp.onComplete(v -> { if (v.succeeded()) { recordEntryReplayStatus("success"); @@ -428,7 +448,8 @@ private void deltaReplay(Promise promise, OptOutCollection store, List { + ref.lastOp.onComplete(ar -> { + this.vertx.cancelTimer(periodicId); if (ar.failed()) { this.logger.error("deltaReplay failed sending delta " + filenames + " to remote: " + this.remotePartner.name(), ar.cause()); this.logger.error("deltaReplay has " + this.pendingFilesCount.get() + " pending file");