diff --git a/src/main/java/fi/hsl/transitdata/vehicleposition/application/VehiclePositionHandler.java b/src/main/java/fi/hsl/transitdata/vehicleposition/application/VehiclePositionHandler.java index 35f980a..e5722a4 100644 --- a/src/main/java/fi/hsl/transitdata/vehicleposition/application/VehiclePositionHandler.java +++ b/src/main/java/fi/hsl/transitdata/vehicleposition/application/VehiclePositionHandler.java @@ -13,6 +13,7 @@ import fi.hsl.transitdata.vehicleposition.application.gtfsrt.GtfsRtGenerator; import fi.hsl.transitdata.vehicleposition.application.gtfsrt.GtfsRtOccupancyStatusHelper; import fi.hsl.transitdata.vehicleposition.application.utils.PassengerCountCache; +import fi.hsl.transitdata.vehicleposition.application.utils.SeqCache; import fi.hsl.transitdata.vehicleposition.application.utils.TripVehicleCache; import fi.hsl.transitdata.vehicleposition.application.utils.VehicleTimestampValidator; import org.apache.pulsar.client.api.*; @@ -37,6 +38,7 @@ public class VehiclePositionHandler implements IMessageHandler { private final Config config; private final TripVehicleCache tripVehicleCache; + private final SeqCache seqCache; private final StopStatusProcessor stopStatusProcessor; private final VehicleTimestampValidator vehicleTimestampValidator; @@ -55,6 +57,7 @@ public VehiclePositionHandler(final PulsarApplicationContext context) { config = context.getConfig(); tripVehicleCache = new TripVehicleCache(); + seqCache = new SeqCache(); stopStatusProcessor = new StopStatusProcessor(); vehicleTimestampValidator = new VehicleTimestampValidator(config.getDuration("processor.vehicleposition.maxTimeDifference", TimeUnit.SECONDS)); @@ -117,6 +120,11 @@ public void handleMessage(Message message) { return; } + //Produce vehicle positions only for the vehicle that has smallest seq + if (data.getPayload().hasSeq() && !seqCache.isSmallestSeq(data.getTopic().getUniqueVehicleId(), data.getPayload().getSeq())) { + return; + } + //If some other vehicle was registered for the trip, do not produce vehicle position if (!tripVehicleCache.registerVehicleForTrip(data.getTopic().getUniqueVehicleId(), data.getTopic().getRouteId(), data.getPayload().getOday(), data.getTopic().getStartTime(), data.getPayload().getDir())) { log.debug("There was already a vehicle registered for trip {} / {} / {} / {} - not producing vehicle position message for {}", data.getTopic().getRouteId(), data.getPayload().getOday(), data.getTopic().getStartTime(), data.getPayload().getDir(), data.getTopic().getUniqueVehicleId()); diff --git a/src/main/java/fi/hsl/transitdata/vehicleposition/application/utils/SeqCache.java b/src/main/java/fi/hsl/transitdata/vehicleposition/application/utils/SeqCache.java new file mode 100644 index 0000000..817bda2 --- /dev/null +++ b/src/main/java/fi/hsl/transitdata/vehicleposition/application/utils/SeqCache.java @@ -0,0 +1,24 @@ +package fi.hsl.transitdata.vehicleposition.application.utils; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.Scheduler; + +import java.util.concurrent.TimeUnit; + +/** + * Cache for saving smallest sequence number seen from a certain vehicle. + * This is needed for producing vehicle positions only from the first vehicle. + */ +public class SeqCache { + private final Cache smallestSeqCache = Caffeine + .newBuilder() + .expireAfterWrite(15, TimeUnit.MINUTES).scheduler(Scheduler.systemScheduler()) + .build(); + + public boolean isSmallestSeq(String uniqueVehicleId, int seq) { + final int smallestSeq = smallestSeqCache.asMap().compute(uniqueVehicleId, (key, prev) -> (prev == null || seq <= prev) ? seq : prev); + + return seq == smallestSeq; + } +} diff --git a/src/test/java/fi/hsl/transitdata/vehicleposition/application/utils/SeqCacheTest.java b/src/test/java/fi/hsl/transitdata/vehicleposition/application/utils/SeqCacheTest.java new file mode 100644 index 0000000..9a2dc31 --- /dev/null +++ b/src/test/java/fi/hsl/transitdata/vehicleposition/application/utils/SeqCacheTest.java @@ -0,0 +1,18 @@ +package fi.hsl.transitdata.vehicleposition.application.utils; + +import org.junit.Test; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class SeqCacheTest { + @Test + public void testSeqCache() { + SeqCache seqCache = new SeqCache(); + + assertTrue(seqCache.isSmallestSeq("1", 7)); + assertFalse(seqCache.isSmallestSeq("1", 9)); + assertTrue(seqCache.isSmallestSeq("1", 7)); + assertTrue(seqCache.isSmallestSeq("1", 1)); + } +}