From b137c97cf9dfbb736721b1171d4008ad3f5ec372 Mon Sep 17 00:00:00 2001 From: Jason Cromer Date: Wed, 13 Feb 2019 14:33:00 -0800 Subject: [PATCH 1/7] First pass at implementing working json parser --- ingest/build.gradle | 1 + .../matcher/ingest/Ingester.java | 14 ++- .../matcher/ingest/input/JsonDTO.java | 29 ++++++ .../matcher/ingest/input/JsonInputFormat.java | 98 +++++++++++++++++++ .../matcher/ingest/input/JsonParser.java | 28 ++++++ .../ingest/input/gpx/EventTypeDTO.java | 21 ++++ 6 files changed, 186 insertions(+), 5 deletions(-) create mode 100644 ingest/src/main/java/io/sharedstreets/matcher/ingest/input/JsonDTO.java create mode 100644 ingest/src/main/java/io/sharedstreets/matcher/ingest/input/JsonInputFormat.java create mode 100644 ingest/src/main/java/io/sharedstreets/matcher/ingest/input/JsonParser.java create mode 100644 ingest/src/main/java/io/sharedstreets/matcher/ingest/input/gpx/EventTypeDTO.java diff --git a/ingest/build.gradle b/ingest/build.gradle index 9fc160d..8de2d37 100644 --- a/ingest/build.gradle +++ b/ingest/build.gradle @@ -25,6 +25,7 @@ dependencies { compile "com.google.protobuf:protobuf-java:${protobufversion}" compile "com.jsoniter:jsoniter:0.9.15" compile 'commons-cli:commons-cli:1.4' + implementation 'com.google.code.gson:gson:2.8.5' compile 'com.esri.geometry:esri-geometry-api:1.2.1' compile 'joda-time:joda-time:2.9.9' diff --git a/ingest/src/main/java/io/sharedstreets/matcher/ingest/Ingester.java b/ingest/src/main/java/io/sharedstreets/matcher/ingest/Ingester.java index 116abb1..90f66ef 100644 --- a/ingest/src/main/java/io/sharedstreets/matcher/ingest/Ingester.java +++ b/ingest/src/main/java/io/sharedstreets/matcher/ingest/Ingester.java @@ -3,6 +3,7 @@ import io.sharedstreets.matcher.ingest.input.CsvEventExtractor; import io.sharedstreets.matcher.ingest.input.DcfhvEventExtractor; import io.sharedstreets.matcher.ingest.input.JsonEventExtractor; +import io.sharedstreets.matcher.ingest.input.JsonInputFormat; import io.sharedstreets.matcher.ingest.input.gpx.GpxInputFormat; import io.sharedstreets.matcher.ingest.model.Ingest; import io.sharedstreets.matcher.ingest.model.InputEvent; @@ -13,6 +14,7 @@ import org.apache.flink.api.common.io.FileOutputFormat; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.core.fs.FileSystem; import org.apache.flink.util.Collector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -140,6 +142,8 @@ else if(fileParts[fileParts.length-1].toLowerCase().equals("dcfhv")) if (finalInputType.equals("GPX")) { inputEvents = env.createInput(new GpxInputFormat(inputPath, gpsSpeeds, finalVerbose)); + } else if (finalInputType.equals("JSON")) { + inputEvents = env.createInput(new JsonInputFormat(new org.apache.flink.core.fs.Path(inputPath))); } else { DataSet inputStream = env.readTextFile(inputPath); @@ -202,10 +206,10 @@ public void flatMap(String value, Collector out) throws Exception { // Path dataPath = Paths.get(outputPath, "event_data").toAbsolutePath(); - if(dataPath.toFile().exists()) { - System.out.print("File already exists: " + outputPath.toString()); - return; - } +// if(dataPath.toFile().exists()) { +// System.out.print("File already exists: " + outputPath.toString()); +// return; +// } // write protobuf of traces inputEvents.write(new FileOutputFormat() { @@ -214,7 +218,7 @@ public void writeRecord(InputEvent record) throws IOException { Ingest.InputEventProto proto = record.toProto(); proto.writeDelimitedTo(this.stream); } - }, dataPath.toString()).setParallelism(1); + }, dataPath.toString(), FileSystem.WriteMode.OVERWRITE).setParallelism(1); env.execute("process"); diff --git a/ingest/src/main/java/io/sharedstreets/matcher/ingest/input/JsonDTO.java b/ingest/src/main/java/io/sharedstreets/matcher/ingest/input/JsonDTO.java new file mode 100644 index 0000000..015c59d --- /dev/null +++ b/ingest/src/main/java/io/sharedstreets/matcher/ingest/input/JsonDTO.java @@ -0,0 +1,29 @@ +package io.sharedstreets.matcher.ingest.input; + +import com.google.gson.annotations.SerializedName; + +import java.util.List; + +public class JsonDTO { + + @SerializedName("eventData") + List eventData; + + public class EventTypeDTO { + + @SerializedName("vehicleId") + public String vehicleId; + + @SerializedName("timeStamp") + public Long timeStamp; + + @SerializedName("latitude") + public double latitude; + + @SerializedName("longitude") + public double longitude; + + @SerializedName("eventType") + public String eventType; + } +} diff --git a/ingest/src/main/java/io/sharedstreets/matcher/ingest/input/JsonInputFormat.java b/ingest/src/main/java/io/sharedstreets/matcher/ingest/input/JsonInputFormat.java new file mode 100644 index 0000000..eb7ff7d --- /dev/null +++ b/ingest/src/main/java/io/sharedstreets/matcher/ingest/input/JsonInputFormat.java @@ -0,0 +1,98 @@ +package io.sharedstreets.matcher.ingest.input; + +import io.sharedstreets.matcher.ingest.model.InputEvent; +import io.sharedstreets.matcher.ingest.model.Point; +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.fs.local.LocalFileStatus; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +public class JsonInputFormat extends FileInputFormat { + + private final AtomicInteger numFiles = new AtomicInteger(0); + private final AtomicInteger currentFile = new AtomicInteger(0); + private static AtomicReference fileContents = new AtomicReference<>(new JsonDTO()); + + public JsonInputFormat(Path filePath) { + super(filePath); + } + + @Override + public FileInputSplit[] createInputSplits(int minNumSplits) throws IOException { + if (minNumSplits < 1) { + throw new IllegalArgumentException("Number of input splits has to be at least 1."); + } + + // take the desired number of splits into account + minNumSplits = Math.max(minNumSplits, this.numSplits); + + final Path path = this.filePath; + + final List inputSplits = new ArrayList<>(minNumSplits); + + final FileSystem fs = this.filePath.getFileSystem(); + int splitNum = 0; + File dir = new File(path.getPath()); + File[] files = dir.listFiles(); + + if (files != null) { + for (File f : files) { + + FileStatus file = new LocalFileStatus(f, fs); + FileInputSplit split = new FileInputSplit(splitNum++, file.getPath(), 0, file.getLen(), + null); + inputSplits.add(split); + + } + } + + return inputSplits.toArray(new FileInputSplit[inputSplits.size()]); + } + + @Override + public void open(FileInputSplit fileSplit) throws IOException { + super.open(fileSplit); + String pathToFile = filePath + "/" + fileSplit.getPath().getName(); + + JsonDTO result = JsonParser.parseJson(pathToFile); + fileContents = new AtomicReference<>(result); + numFiles.compareAndSet(0, result.eventData.size()); + } + + @Override + public boolean reachedEnd() { + return currentFile.get() >= numFiles.get(); + } + + @Override + public InputEvent nextRecord(InputEvent reuse) { + int index = foo(); + + reuse.vehicleId = fileContents.get().eventData.get(index).vehicleId; + reuse.time = fileContents.get().eventData.get(index).timeStamp; + reuse.point = new Point(fileContents.get().eventData.get(index).longitude, fileContents.get().eventData.get(index).latitude); + + if (fileContents.get().eventData.get(index).eventType != null) { + HashMap event = new HashMap<>(); + event.put(fileContents.get().eventData.get(index).eventType, 0.0); + reuse.eventData = event; + } + + return reuse; + } + + synchronized int foo() { + return currentFile.getAndIncrement(); + } +} diff --git a/ingest/src/main/java/io/sharedstreets/matcher/ingest/input/JsonParser.java b/ingest/src/main/java/io/sharedstreets/matcher/ingest/input/JsonParser.java new file mode 100644 index 0000000..3856f97 --- /dev/null +++ b/ingest/src/main/java/io/sharedstreets/matcher/ingest/input/JsonParser.java @@ -0,0 +1,28 @@ +package io.sharedstreets.matcher.ingest.input; + +import com.google.gson.Gson; + +import java.io.*; + +public class JsonParser { + + static JsonDTO parseJson(final String pathToFile) { + JsonDTO result = null; + try (BufferedReader reader = new BufferedReader(new FileReader(new File(pathToFile)))) { + + Gson gson = new Gson(); + StringBuilder builder = new StringBuilder(); + String line; + while ((line = reader.readLine()) != null) { + builder.append(line); + } + reader.close(); + + result = gson.fromJson(builder.toString(), JsonDTO.class); + } catch (IOException e) { + e.printStackTrace(); + } + + return result; + } +} diff --git a/ingest/src/main/java/io/sharedstreets/matcher/ingest/input/gpx/EventTypeDTO.java b/ingest/src/main/java/io/sharedstreets/matcher/ingest/input/gpx/EventTypeDTO.java new file mode 100644 index 0000000..791c90f --- /dev/null +++ b/ingest/src/main/java/io/sharedstreets/matcher/ingest/input/gpx/EventTypeDTO.java @@ -0,0 +1,21 @@ +package io.sharedstreets.matcher.ingest.input.gpx; + +import com.google.gson.annotations.SerializedName; + +public class EventTypeDTO { + + @SerializedName("vehicleId") + public String vehicleId; + + @SerializedName("timeStamp") + public Long timeStamp; + + @SerializedName("latitude") + public double latitude; + + @SerializedName("longitude") + public double longitude; + + @SerializedName("eventType") + public String eventType; +} \ No newline at end of file From 2b8882f97e857d7632c07ce9f7f5ff73eead3971 Mon Sep 17 00:00:00 2001 From: Jason Cromer Date: Wed, 13 Feb 2019 17:04:08 -0800 Subject: [PATCH 2/7] Clean up Json parsing and handle no file case gracefully --- .../matcher/ingest/Ingester.java | 176 +++++++++--------- .../ingest/input/JsonEventExtractor.java | 69 ------- .../matcher/ingest/input/JsonParser.java | 28 --- .../ingest/input/gpx/EventTypeDTO.java | 21 --- .../input/{ => json}/JsonInputFormat.java | 58 +++--- .../matcher/ingest/input/json/JsonParser.java | 39 ++++ .../JsonInputObject.java} | 11 +- 7 files changed, 158 insertions(+), 244 deletions(-) delete mode 100644 ingest/src/main/java/io/sharedstreets/matcher/ingest/input/JsonEventExtractor.java delete mode 100644 ingest/src/main/java/io/sharedstreets/matcher/ingest/input/JsonParser.java delete mode 100644 ingest/src/main/java/io/sharedstreets/matcher/ingest/input/gpx/EventTypeDTO.java rename ingest/src/main/java/io/sharedstreets/matcher/ingest/input/{ => json}/JsonInputFormat.java (54%) create mode 100644 ingest/src/main/java/io/sharedstreets/matcher/ingest/input/json/JsonParser.java rename ingest/src/main/java/io/sharedstreets/matcher/ingest/{input/JsonDTO.java => model/JsonInputObject.java} (65%) diff --git a/ingest/src/main/java/io/sharedstreets/matcher/ingest/Ingester.java b/ingest/src/main/java/io/sharedstreets/matcher/ingest/Ingester.java index 90f66ef..7dee547 100644 --- a/ingest/src/main/java/io/sharedstreets/matcher/ingest/Ingester.java +++ b/ingest/src/main/java/io/sharedstreets/matcher/ingest/Ingester.java @@ -2,20 +2,16 @@ import io.sharedstreets.matcher.ingest.input.CsvEventExtractor; import io.sharedstreets.matcher.ingest.input.DcfhvEventExtractor; -import io.sharedstreets.matcher.ingest.input.JsonEventExtractor; -import io.sharedstreets.matcher.ingest.input.JsonInputFormat; +import io.sharedstreets.matcher.ingest.input.json.JsonInputFormat; import io.sharedstreets.matcher.ingest.input.gpx.GpxInputFormat; import io.sharedstreets.matcher.ingest.model.Ingest; import io.sharedstreets.matcher.ingest.model.InputEvent; -import io.sharedstreets.matcher.ingest.util.TileId; import org.apache.commons.cli.*; import org.apache.flink.api.common.functions.FlatMapFunction; -import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.io.FileOutputFormat; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.core.fs.FileSystem; -import org.apache.flink.util.Collector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -26,7 +22,25 @@ public class Ingester { - static Logger logger = LoggerFactory.getLogger(Ingester.class); + public enum FileType { + CSV("CSV"), + JSON("JSON"), + GPX("GPX"), + DCFHV("DCFHV"); + + private final String stringValue; + + FileType(String csv) { + stringValue = csv; + } + + @Override + public String toString() { + return stringValue; + } + } + + private static Logger logger = LoggerFactory.getLogger(Ingester.class); public static void main(String[] args) throws Exception { // create the command line parser @@ -35,88 +49,83 @@ public static void main(String[] args) throws Exception { // create the Options Options options = new Options(); - options.addOption( OptionBuilder.withLongOpt( "input" ) - .withDescription( "path to input files" ) + options.addOption(OptionBuilder.withLongOpt("input") + .withDescription("path to input files") .hasArg() .withArgName("INPUT-DIR") - .create() ); + .create()); - options.addOption( OptionBuilder.withLongOpt( "type" ) - .withDescription( "input type, supports: [CSV, JSON, GPX]" ) + options.addOption(OptionBuilder.withLongOpt("type") + .withDescription("input type, supports: [CSV, JSON, GPX]") .hasArg() .withArgName("INPUT-DIR") - .create() ); + .create()); - options.addOption( OptionBuilder.withLongOpt( "output" ) - .withDescription( "path to output (will be overwritten)" ) + options.addOption(OptionBuilder.withLongOpt("output") + .withDescription("path to output (will be overwritten)") .hasArg() .withArgName("OUTPUT-DIR") - .create() ); - - + .create()); options.addOption("speeds", "track GPS speed when available"); - options.addOption("verbose", "verbose error output" ); + options.addOption("verbose", "verbose error output"); String inputPath = ""; - String outputPath = ""; - String inputType = ""; boolean verbose = false; - - boolean gpsSpeeds = false; try { // parse the command line arguments - CommandLine line = parser.parse( options, args ); - + CommandLine line = parser.parse(options, args); - - if( line.hasOption( "input" ) ) { + if (line.hasOption("input")) { // print the value of block-size - inputPath = line.getOptionValue( "input" ); + inputPath = line.getOptionValue("input"); } - if( line.hasOption( "output" ) ) { + if (line.hasOption("output")) { // print the value of block-size - outputPath = line.getOptionValue( "output" ); + outputPath = line.getOptionValue("output"); } - if( line.hasOption( "speeds" ) ) { + if (line.hasOption("speeds")) { // print the value of block-size gpsSpeeds = true; } - if( line.hasOption( "verbose" ) ) { + if (line.hasOption("verbose")) { verbose = true; } - if( line.hasOption( "type" ) ) { + if (line.hasOption("type")) { // print the value of block-size - inputType = line.getOptionValue( "type" ).trim().toUpperCase(); - } - else { - String fileParts[] = inputPath.split("\\."); - if(fileParts[fileParts.length-1].toLowerCase().equals("csv")) - inputType = "CSV"; - else if(fileParts[fileParts.length-1].toLowerCase().equals("json")) - inputType = "JSON"; - else if(fileParts[fileParts.length-1].toLowerCase().equals("gpx")) - inputType = "GPX"; - else if(fileParts[fileParts.length-1].toLowerCase().equals("dcfhv")) - inputType = "DCFHV"; + inputType = line.getOptionValue("type").trim().toUpperCase(); + } else { + String[] fileParts = inputPath.split("\\."); + switch (fileParts[fileParts.length - 1].toLowerCase()) { + case "csv": + inputType = FileType.CSV.toString(); + break; + case "json": + inputType = FileType.JSON.toString(); + break; + case "gpx": + inputType = FileType.GPX.toString(); + break; + case "dcfhv": + inputType = FileType.DCFHV.toString(); + break; + } } - - } - catch( Exception exp ) { + } catch (Exception exp) { HelpFormatter formatter = new HelpFormatter(); - formatter.printHelp( "integster", options ); + formatter.printHelp("integster", options); - System.out.println( "Unexpected exception:" + exp.getMessage() ); + System.out.println("Unexpected exception:" + exp.getMessage()); return; } @@ -124,55 +133,46 @@ else if(fileParts[fileParts.length-1].toLowerCase().equals("dcfhv")) final boolean finalVerbose = verbose; // let's go... - logger.info("Starting up streams..."); - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - if(inputPath == null || inputPath.trim().isEmpty()) { + if (inputPath == null || inputPath.trim().isEmpty()) { HelpFormatter formatter = new HelpFormatter(); - formatter.printHelp( "integster", options ); + formatter.printHelp("integster", options); return; } // process events // step 1: read strings (blocks of location events from file) - DataSet inputEvents = null; - - if (finalInputType.equals("GPX")) { - inputEvents = env.createInput(new GpxInputFormat(inputPath, gpsSpeeds, finalVerbose)); - } else if (finalInputType.equals("JSON")) { - inputEvents = env.createInput(new JsonInputFormat(new org.apache.flink.core.fs.Path(inputPath))); - } - else { - DataSet inputStream = env.readTextFile(inputPath); - - // open text based file formats and map strings to extractor methods - inputEvents = inputStream.flatMap(new FlatMapFunction() { - - @Override - public void flatMap(String value, Collector out) throws Exception { - - if (finalInputType.equals("CSV")) { - List inputEvents = CsvEventExtractor.extractEvents(value, finalVerbose); - - for (InputEvent inputEvent : inputEvents) + DataSet inputEvents; + + switch (FileType.valueOf(finalInputType)) { + case GPX: + inputEvents = env.createInput(new GpxInputFormat(inputPath, gpsSpeeds, finalVerbose)); + break; + case JSON: + inputEvents = env.createInput(new JsonInputFormat(new org.apache.flink.core.fs.Path(inputPath))); + break; + default: + DataSet inputStream = env.readTextFile(inputPath); + + // open text based file formats and map strings to extractor methods + inputEvents = inputStream.flatMap((FlatMapFunction) (value, out) -> { + if (finalInputType.equals(FileType.CSV.toString())) { + List inputEvents1 = CsvEventExtractor.extractEvents(value, finalVerbose); + + for (InputEvent inputEvent : inputEvents1) { out.collect(inputEvent); + } + } else if (finalInputType.equals(FileType.DCFHV.toString())) { + List inputEvents1 = DcfhvEventExtractor.extractEvents(value, finalVerbose); - } else if (finalInputType.equals("JSON")) { - List inputEvents = JsonEventExtractor.extractEvents(value, finalVerbose); - - for (InputEvent inputEvent : inputEvents) - out.collect(inputEvent); - } else if (finalInputType.equals("DCFHV")) { - List inputEvents = DcfhvEventExtractor.extractEvents(value, finalVerbose); - - for (InputEvent inputEvent : inputEvents) + for (InputEvent inputEvent : inputEvents1) { out.collect(inputEvent); + } } - } - }); + }); } // // create list of map tiles for input traces @@ -206,10 +206,9 @@ public void flatMap(String value, Collector out) throws Exception { // Path dataPath = Paths.get(outputPath, "event_data").toAbsolutePath(); -// if(dataPath.toFile().exists()) { -// System.out.print("File already exists: " + outputPath.toString()); -// return; -// } + if (dataPath.toFile().exists()) { + System.out.print("File already exists: " + outputPath + " \n...Overwriting\n"); + } // write protobuf of traces inputEvents.write(new FileOutputFormat() { @@ -221,6 +220,5 @@ public void writeRecord(InputEvent record) throws IOException { }, dataPath.toString(), FileSystem.WriteMode.OVERWRITE).setParallelism(1); env.execute("process"); - } } diff --git a/ingest/src/main/java/io/sharedstreets/matcher/ingest/input/JsonEventExtractor.java b/ingest/src/main/java/io/sharedstreets/matcher/ingest/input/JsonEventExtractor.java deleted file mode 100644 index 17a2df5..0000000 --- a/ingest/src/main/java/io/sharedstreets/matcher/ingest/input/JsonEventExtractor.java +++ /dev/null @@ -1,69 +0,0 @@ -package io.sharedstreets.matcher.ingest.input; - - -import com.jsoniter.JsonIterator; -import io.sharedstreets.matcher.ingest.model.Point; -import io.sharedstreets.matcher.ingest.model.InputEvent; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; - -public class JsonEventExtractor { - - static Logger logger = LoggerFactory.getLogger(JsonEventExtractor.class); - - public static List extractEvents(String value, boolean verbose) throws IOException { - - ArrayList list = new ArrayList(); - - try { - - InputEvent event = new InputEvent(); - event.point = new Point(); - - JsonIterator iter = JsonIterator.parse(value); - - String eventType = null; - Double eventValue = null; - - for (String field = iter.readObject(); field != null; field = iter.readObject()) { - - if(field.equals("timestamp")) - event.time = iter.readLong() * 1000; - else if(field.equals("longitude")) - event.point.lon = Double.parseDouble(iter.readString()); - else if(field.equals("latitude")) - event.point.lat = Double.parseDouble(iter.readString()); - else if(field.equals("id")) - event.vehicleId = iter.readString(); - else if(field.equals("eventType")) - eventType = iter.readString(); - else if(field.equals("eventValue")) - eventValue = iter.readDouble(); - else - iter.read(); // no-op - } - - if(eventType != null && !eventType.trim().equals("")) { - event.eventData = new HashMap<>(); - event.eventData.put(eventType, eventValue); - } - - // single event per line - // TODO add grouped trace in JSON format - - list.add(event); - } - catch(Exception e) { - - logger.error("Unable to parse line: " + value); - - } - - return list; - } -} diff --git a/ingest/src/main/java/io/sharedstreets/matcher/ingest/input/JsonParser.java b/ingest/src/main/java/io/sharedstreets/matcher/ingest/input/JsonParser.java deleted file mode 100644 index 3856f97..0000000 --- a/ingest/src/main/java/io/sharedstreets/matcher/ingest/input/JsonParser.java +++ /dev/null @@ -1,28 +0,0 @@ -package io.sharedstreets.matcher.ingest.input; - -import com.google.gson.Gson; - -import java.io.*; - -public class JsonParser { - - static JsonDTO parseJson(final String pathToFile) { - JsonDTO result = null; - try (BufferedReader reader = new BufferedReader(new FileReader(new File(pathToFile)))) { - - Gson gson = new Gson(); - StringBuilder builder = new StringBuilder(); - String line; - while ((line = reader.readLine()) != null) { - builder.append(line); - } - reader.close(); - - result = gson.fromJson(builder.toString(), JsonDTO.class); - } catch (IOException e) { - e.printStackTrace(); - } - - return result; - } -} diff --git a/ingest/src/main/java/io/sharedstreets/matcher/ingest/input/gpx/EventTypeDTO.java b/ingest/src/main/java/io/sharedstreets/matcher/ingest/input/gpx/EventTypeDTO.java deleted file mode 100644 index 791c90f..0000000 --- a/ingest/src/main/java/io/sharedstreets/matcher/ingest/input/gpx/EventTypeDTO.java +++ /dev/null @@ -1,21 +0,0 @@ -package io.sharedstreets.matcher.ingest.input.gpx; - -import com.google.gson.annotations.SerializedName; - -public class EventTypeDTO { - - @SerializedName("vehicleId") - public String vehicleId; - - @SerializedName("timeStamp") - public Long timeStamp; - - @SerializedName("latitude") - public double latitude; - - @SerializedName("longitude") - public double longitude; - - @SerializedName("eventType") - public String eventType; -} \ No newline at end of file diff --git a/ingest/src/main/java/io/sharedstreets/matcher/ingest/input/JsonInputFormat.java b/ingest/src/main/java/io/sharedstreets/matcher/ingest/input/json/JsonInputFormat.java similarity index 54% rename from ingest/src/main/java/io/sharedstreets/matcher/ingest/input/JsonInputFormat.java rename to ingest/src/main/java/io/sharedstreets/matcher/ingest/input/json/JsonInputFormat.java index eb7ff7d..be034a5 100644 --- a/ingest/src/main/java/io/sharedstreets/matcher/ingest/input/JsonInputFormat.java +++ b/ingest/src/main/java/io/sharedstreets/matcher/ingest/input/json/JsonInputFormat.java @@ -1,6 +1,7 @@ -package io.sharedstreets.matcher.ingest.input; +package io.sharedstreets.matcher.ingest.input.json; import io.sharedstreets.matcher.ingest.model.InputEvent; +import io.sharedstreets.matcher.ingest.model.JsonInputObject; import io.sharedstreets.matcher.ingest.model.Point; import org.apache.flink.api.common.io.FileInputFormat; import org.apache.flink.core.fs.FileInputSplit; @@ -12,17 +13,14 @@ import java.io.File; import java.io.IOException; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; public class JsonInputFormat extends FileInputFormat { - private final AtomicInteger numFiles = new AtomicInteger(0); - private final AtomicInteger currentFile = new AtomicInteger(0); - private static AtomicReference fileContents = new AtomicReference<>(new JsonDTO()); + private final AtomicInteger numEvents = new AtomicInteger(0); + private final AtomicInteger currentEventIndex = new AtomicInteger(0); + private static JsonInputObject parsedFileContents = new JsonInputObject(); public JsonInputFormat(Path filePath) { super(filePath); @@ -37,27 +35,22 @@ public FileInputSplit[] createInputSplits(int minNumSplits) throws IOException { // take the desired number of splits into account minNumSplits = Math.max(minNumSplits, this.numSplits); - final Path path = this.filePath; - final List inputSplits = new ArrayList<>(minNumSplits); - - final FileSystem fs = this.filePath.getFileSystem(); - int splitNum = 0; - File dir = new File(path.getPath()); + final FileSystem fs = filePath.getFileSystem(); + File dir = new File(filePath.getPath()); File[] files = dir.listFiles(); if (files != null) { - for (File f : files) { + int splitNum = 0; + for (File f : files) { FileStatus file = new LocalFileStatus(f, fs); - FileInputSplit split = new FileInputSplit(splitNum++, file.getPath(), 0, file.getLen(), - null); + FileInputSplit split = new FileInputSplit(splitNum++, file.getPath(), 0, file.getLen(), null); inputSplits.add(split); - } } - return inputSplits.toArray(new FileInputSplit[inputSplits.size()]); + return inputSplits.toArray(new FileInputSplit[0]); } @Override @@ -65,34 +58,35 @@ public void open(FileInputSplit fileSplit) throws IOException { super.open(fileSplit); String pathToFile = filePath + "/" + fileSplit.getPath().getName(); - JsonDTO result = JsonParser.parseJson(pathToFile); - fileContents = new AtomicReference<>(result); - numFiles.compareAndSet(0, result.eventData.size()); + JsonInputObject result = JsonParser.parseJson(pathToFile); + + if (result != null) { + parsedFileContents = result; + numEvents.compareAndSet(0, result.eventData.size()); + } } @Override public boolean reachedEnd() { - return currentFile.get() >= numFiles.get(); + return currentEventIndex.get() >= numEvents.get(); } @Override public InputEvent nextRecord(InputEvent reuse) { - int index = foo(); + int index = incrementEventIndex(); - reuse.vehicleId = fileContents.get().eventData.get(index).vehicleId; - reuse.time = fileContents.get().eventData.get(index).timeStamp; - reuse.point = new Point(fileContents.get().eventData.get(index).longitude, fileContents.get().eventData.get(index).latitude); + reuse.vehicleId = parsedFileContents.eventData.get(index).vehicleId; + reuse.time = parsedFileContents.eventData.get(index).timeStamp; + reuse.point = new Point(parsedFileContents.eventData.get(index).longitude, parsedFileContents.eventData.get(index).latitude); - if (fileContents.get().eventData.get(index).eventType != null) { - HashMap event = new HashMap<>(); - event.put(fileContents.get().eventData.get(index).eventType, 0.0); - reuse.eventData = event; + if (parsedFileContents.eventData.get(index).eventType != null) { + reuse.eventData = parsedFileContents.eventData.get(index).eventType; } return reuse; } - synchronized int foo() { - return currentFile.getAndIncrement(); + private synchronized int incrementEventIndex() { + return currentEventIndex.getAndIncrement(); } } diff --git a/ingest/src/main/java/io/sharedstreets/matcher/ingest/input/json/JsonParser.java b/ingest/src/main/java/io/sharedstreets/matcher/ingest/input/json/JsonParser.java new file mode 100644 index 0000000..05300a9 --- /dev/null +++ b/ingest/src/main/java/io/sharedstreets/matcher/ingest/input/json/JsonParser.java @@ -0,0 +1,39 @@ +package io.sharedstreets.matcher.ingest.input.json; + +import com.google.gson.Gson; +import io.sharedstreets.matcher.ingest.model.JsonInputObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.*; + +class JsonParser { + + private static Logger logger = LoggerFactory.getLogger(JsonParser.class); + + static JsonInputObject parseJson(final String pathToFile) { + if (pathToFile == null || pathToFile.isEmpty()) { + logger.error("Path is empty or null when parsing Json File"); + return null; + } + JsonInputObject result = null; + + try (BufferedReader reader = new BufferedReader(new FileReader(new File(pathToFile)))) { + + Gson gson = new Gson(); + StringBuilder builder = new StringBuilder(); + String line; + + while ((line = reader.readLine()) != null) { + builder.append(line); + } + reader.close(); + + result = gson.fromJson(builder.toString(), JsonInputObject.class); + } catch (IOException e) { + e.printStackTrace(); + } + + return result; + } +} diff --git a/ingest/src/main/java/io/sharedstreets/matcher/ingest/input/JsonDTO.java b/ingest/src/main/java/io/sharedstreets/matcher/ingest/model/JsonInputObject.java similarity index 65% rename from ingest/src/main/java/io/sharedstreets/matcher/ingest/input/JsonDTO.java rename to ingest/src/main/java/io/sharedstreets/matcher/ingest/model/JsonInputObject.java index 015c59d..c2666d7 100644 --- a/ingest/src/main/java/io/sharedstreets/matcher/ingest/input/JsonDTO.java +++ b/ingest/src/main/java/io/sharedstreets/matcher/ingest/model/JsonInputObject.java @@ -1,15 +1,16 @@ -package io.sharedstreets.matcher.ingest.input; +package io.sharedstreets.matcher.ingest.model; import com.google.gson.annotations.SerializedName; +import java.util.HashMap; import java.util.List; -public class JsonDTO { +public class JsonInputObject { @SerializedName("eventData") - List eventData; + public List eventData; - public class EventTypeDTO { + public class JsonEventObject { @SerializedName("vehicleId") public String vehicleId; @@ -24,6 +25,6 @@ public class EventTypeDTO { public double longitude; @SerializedName("eventType") - public String eventType; + public HashMap eventType; } } From 98c952c2d844a667547666780a273e8636e0cfc8 Mon Sep 17 00:00:00 2001 From: Jason Cromer Date: Wed, 13 Feb 2019 17:21:02 -0800 Subject: [PATCH 3/7] Add sample JSON data --- sample_data/json/testData.json | 40 +++++++++++++++++++++++++++++++++ sample_data/json/testData2.json | 40 +++++++++++++++++++++++++++++++++ 2 files changed, 80 insertions(+) create mode 100644 sample_data/json/testData.json create mode 100644 sample_data/json/testData2.json diff --git a/sample_data/json/testData.json b/sample_data/json/testData.json new file mode 100644 index 0000000..352b267 --- /dev/null +++ b/sample_data/json/testData.json @@ -0,0 +1,40 @@ +{ + "eventData": [ + { + "vehicleId":"test_trace", + "timeStamp":1550021697198, + "latitude":32.981278, + "longitude":-96.70976413, + "eventType": { + "PICKUP":0.0 + } + }, + { + "vehicleId":"test_trace", + "timeStamp":1550021707198, + "latitude":32.98110025, + "longitude":-96.709954083 + }, + { + "vehicleId":"test_trace", + "timeStamp":1550021717198, + "latitude":32.98100383, + "longitude":-96.71010795 + }, + { + "vehicleId":"test_trace", + "timeStamp":1550021727198, + "latitude":32.98092837, + "longitude":-96.71021082 + }, + { + "vehicleId":"test_trace", + "timeStamp":1550021757198, + "latitude":32.98087908, + "longitude":-96.71030881, + "eventType": { + "DROPOFF":0.0 + } + } + ] +} \ No newline at end of file diff --git a/sample_data/json/testData2.json b/sample_data/json/testData2.json new file mode 100644 index 0000000..352b267 --- /dev/null +++ b/sample_data/json/testData2.json @@ -0,0 +1,40 @@ +{ + "eventData": [ + { + "vehicleId":"test_trace", + "timeStamp":1550021697198, + "latitude":32.981278, + "longitude":-96.70976413, + "eventType": { + "PICKUP":0.0 + } + }, + { + "vehicleId":"test_trace", + "timeStamp":1550021707198, + "latitude":32.98110025, + "longitude":-96.709954083 + }, + { + "vehicleId":"test_trace", + "timeStamp":1550021717198, + "latitude":32.98100383, + "longitude":-96.71010795 + }, + { + "vehicleId":"test_trace", + "timeStamp":1550021727198, + "latitude":32.98092837, + "longitude":-96.71021082 + }, + { + "vehicleId":"test_trace", + "timeStamp":1550021757198, + "latitude":32.98087908, + "longitude":-96.71030881, + "eventType": { + "DROPOFF":0.0 + } + } + ] +} \ No newline at end of file From e72f240d1a6061e2e8c67b8dc471a93f0174fad4 Mon Sep 17 00:00:00 2001 From: Jason Cromer Date: Wed, 13 Feb 2019 17:32:43 -0800 Subject: [PATCH 4/7] Rename trace data sample files and update README --- README.md | 12 ++++++++++++ .../json/{testData2.json => json_trace1.json} | 19 ++++++++++++++----- .../json/{testData.json => json_trace2.json} | 0 3 files changed, 26 insertions(+), 5 deletions(-) rename sample_data/json/{testData2.json => json_trace1.json} (68%) rename sample_data/json/{testData.json => json_trace2.json} (100%) diff --git a/README.md b/README.md index 4d84aac..a0fa85a 100644 --- a/README.md +++ b/README.md @@ -70,6 +70,18 @@ Command to load CSV data: java -jar [path/to]/ingest-1.1.jar --input sample_data/csv/csv_trace1.csv --output csv_gpx/ --type csv ``` +#### JSON data + +Ingest tool imports JSON data using the format specified in in the `sample_data/json/` sample files labeled `json_trace1.json` and `json_trace2.json`. +The file `json_trace2.json` is the same trace as `json_trace.json`, but contains a "PICKUP" and "DROPOFF" event. + +A single JSON file can contain multiple traces, as long as each vehicle is uniquely identified within the file(s). The order of the records in the JSON file does not impact map matching, as records are sorted by time for each group of vehicle IDs. + +Command to load JSON data: + +``` +java -jar [path/to]/ingest-1.1.jar --input sample_data/json/json_trace1.json --output json_gpx/ --type json +``` ![Traffic Map](docs/images/pickup_event_trace.png) diff --git a/sample_data/json/testData2.json b/sample_data/json/json_trace1.json similarity index 68% rename from sample_data/json/testData2.json rename to sample_data/json/json_trace1.json index 352b267..8721958 100644 --- a/sample_data/json/testData2.json +++ b/sample_data/json/json_trace1.json @@ -6,26 +6,35 @@ "latitude":32.981278, "longitude":-96.70976413, "eventType": { - "PICKUP":0.0 + "gpsSpeed":12.310797 } }, { "vehicleId":"test_trace", "timeStamp":1550021707198, "latitude":32.98110025, - "longitude":-96.709954083 + "longitude":-96.709954083, + "eventType": { + "gpsSpeed":11.960151 + } }, { "vehicleId":"test_trace", "timeStamp":1550021717198, "latitude":32.98100383, - "longitude":-96.71010795 + "longitude":-96.71010795, + "eventType": { + "gpsSpeed":9.841646 + } }, { "vehicleId":"test_trace", "timeStamp":1550021727198, "latitude":32.98092837, - "longitude":-96.71021082 + "longitude":-96.71021082, + "eventType": { + "gpsSpeed":5.802207 + } }, { "vehicleId":"test_trace", @@ -33,7 +42,7 @@ "latitude":32.98087908, "longitude":-96.71030881, "eventType": { - "DROPOFF":0.0 + "gpsSpeed":3.620221 } } ] diff --git a/sample_data/json/testData.json b/sample_data/json/json_trace2.json similarity index 100% rename from sample_data/json/testData.json rename to sample_data/json/json_trace2.json From c237f0220c55e6ec23f37ed731e49e682e3b0ecf Mon Sep 17 00:00:00 2001 From: Jason Cromer Date: Wed, 13 Feb 2019 17:36:48 -0800 Subject: [PATCH 5/7] Rename list variables for clarity. --- gradlew | 6 +++--- .../java/io/sharedstreets/matcher/ingest/Ingester.java | 8 ++++---- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/gradlew b/gradlew index 4453cce..cccdd3d 100755 --- a/gradlew +++ b/gradlew @@ -33,11 +33,11 @@ DEFAULT_JVM_OPTS="" # Use the maximum available, or set MAX_FD != -1 to use that value. MAX_FD="maximum" -warn ( ) { +warn () { echo "$*" } -die ( ) { +die () { echo echo "$*" echo @@ -155,7 +155,7 @@ if $cygwin ; then fi # Escape application args -save ( ) { +save () { for i do printf %s\\n "$i" | sed "s/'/'\\\\''/g;1s/^/'/;\$s/\$/' \\\\/" ; done echo " " } diff --git a/ingest/src/main/java/io/sharedstreets/matcher/ingest/Ingester.java b/ingest/src/main/java/io/sharedstreets/matcher/ingest/Ingester.java index 7dee547..917087b 100644 --- a/ingest/src/main/java/io/sharedstreets/matcher/ingest/Ingester.java +++ b/ingest/src/main/java/io/sharedstreets/matcher/ingest/Ingester.java @@ -160,15 +160,15 @@ public static void main(String[] args) throws Exception { // open text based file formats and map strings to extractor methods inputEvents = inputStream.flatMap((FlatMapFunction) (value, out) -> { if (finalInputType.equals(FileType.CSV.toString())) { - List inputEvents1 = CsvEventExtractor.extractEvents(value, finalVerbose); + List csvInputEvents = CsvEventExtractor.extractEvents(value, finalVerbose); - for (InputEvent inputEvent : inputEvents1) { + for (InputEvent inputEvent : csvInputEvents) { out.collect(inputEvent); } } else if (finalInputType.equals(FileType.DCFHV.toString())) { - List inputEvents1 = DcfhvEventExtractor.extractEvents(value, finalVerbose); + List dcfhvInputEvents = DcfhvEventExtractor.extractEvents(value, finalVerbose); - for (InputEvent inputEvent : inputEvents1) { + for (InputEvent inputEvent : dcfhvInputEvents) { out.collect(inputEvent); } } From c03807a054c5bdb57d896892229a7741d6a8ae24 Mon Sep 17 00:00:00 2001 From: Jason Cromer Date: Wed, 13 Feb 2019 20:16:47 -0800 Subject: [PATCH 6/7] Make parsed file transient and use `currentSplit` in the JsonInputFormat to correctly parse with parallelism --- .../matcher/ingest/input/json/JsonInputFormat.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ingest/src/main/java/io/sharedstreets/matcher/ingest/input/json/JsonInputFormat.java b/ingest/src/main/java/io/sharedstreets/matcher/ingest/input/json/JsonInputFormat.java index be034a5..6688df0 100644 --- a/ingest/src/main/java/io/sharedstreets/matcher/ingest/input/json/JsonInputFormat.java +++ b/ingest/src/main/java/io/sharedstreets/matcher/ingest/input/json/JsonInputFormat.java @@ -20,7 +20,7 @@ public class JsonInputFormat extends FileInputFormat { private final AtomicInteger numEvents = new AtomicInteger(0); private final AtomicInteger currentEventIndex = new AtomicInteger(0); - private static JsonInputObject parsedFileContents = new JsonInputObject(); + private transient JsonInputObject parsedFileContents = new JsonInputObject(); public JsonInputFormat(Path filePath) { super(filePath); @@ -56,7 +56,7 @@ public FileInputSplit[] createInputSplits(int minNumSplits) throws IOException { @Override public void open(FileInputSplit fileSplit) throws IOException { super.open(fileSplit); - String pathToFile = filePath + "/" + fileSplit.getPath().getName(); + String pathToFile = filePath + "/" + currentSplit.getPath().getName(); JsonInputObject result = JsonParser.parseJson(pathToFile); From d25469224151e4c53cc7d19c8864c5858b896b0e Mon Sep 17 00:00:00 2001 From: Jason Cromer Date: Thu, 7 Mar 2019 11:31:32 -0800 Subject: [PATCH 7/7] use compile instead of implementation to include GSON in jar --- ingest/build.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ingest/build.gradle b/ingest/build.gradle index 8de2d37..afeecb0 100644 --- a/ingest/build.gradle +++ b/ingest/build.gradle @@ -25,7 +25,7 @@ dependencies { compile "com.google.protobuf:protobuf-java:${protobufversion}" compile "com.jsoniter:jsoniter:0.9.15" compile 'commons-cli:commons-cli:1.4' - implementation 'com.google.code.gson:gson:2.8.5' + compile 'com.google.code.gson:gson:2.8.5' compile 'com.esri.geometry:esri-geometry-api:1.2.1' compile 'joda-time:joda-time:2.9.9'