diff --git a/README.md b/README.md index 4d84aac..a0fa85a 100644 --- a/README.md +++ b/README.md @@ -70,6 +70,18 @@ Command to load CSV data: java -jar [path/to]/ingest-1.1.jar --input sample_data/csv/csv_trace1.csv --output csv_gpx/ --type csv ``` +#### JSON data + +Ingest tool imports JSON data using the format specified in in the `sample_data/json/` sample files labeled `json_trace1.json` and `json_trace2.json`. +The file `json_trace2.json` is the same trace as `json_trace.json`, but contains a "PICKUP" and "DROPOFF" event. + +A single JSON file can contain multiple traces, as long as each vehicle is uniquely identified within the file(s). The order of the records in the JSON file does not impact map matching, as records are sorted by time for each group of vehicle IDs. + +Command to load JSON data: + +``` +java -jar [path/to]/ingest-1.1.jar --input sample_data/json/json_trace1.json --output json_gpx/ --type json +``` ![Traffic Map](docs/images/pickup_event_trace.png) diff --git a/gradlew b/gradlew index 4453cce..cccdd3d 100755 --- a/gradlew +++ b/gradlew @@ -33,11 +33,11 @@ DEFAULT_JVM_OPTS="" # Use the maximum available, or set MAX_FD != -1 to use that value. MAX_FD="maximum" -warn ( ) { +warn () { echo "$*" } -die ( ) { +die () { echo echo "$*" echo @@ -155,7 +155,7 @@ if $cygwin ; then fi # Escape application args -save ( ) { +save () { for i do printf %s\\n "$i" | sed "s/'/'\\\\''/g;1s/^/'/;\$s/\$/' \\\\/" ; done echo " " } diff --git a/ingest/build.gradle b/ingest/build.gradle index 9fc160d..afeecb0 100644 --- a/ingest/build.gradle +++ b/ingest/build.gradle @@ -25,6 +25,7 @@ dependencies { compile "com.google.protobuf:protobuf-java:${protobufversion}" compile "com.jsoniter:jsoniter:0.9.15" compile 'commons-cli:commons-cli:1.4' + compile 'com.google.code.gson:gson:2.8.5' compile 'com.esri.geometry:esri-geometry-api:1.2.1' compile 'joda-time:joda-time:2.9.9' diff --git a/ingest/src/main/java/io/sharedstreets/matcher/ingest/Ingester.java b/ingest/src/main/java/io/sharedstreets/matcher/ingest/Ingester.java index 116abb1..917087b 100644 --- a/ingest/src/main/java/io/sharedstreets/matcher/ingest/Ingester.java +++ b/ingest/src/main/java/io/sharedstreets/matcher/ingest/Ingester.java @@ -2,18 +2,16 @@ import io.sharedstreets.matcher.ingest.input.CsvEventExtractor; import io.sharedstreets.matcher.ingest.input.DcfhvEventExtractor; -import io.sharedstreets.matcher.ingest.input.JsonEventExtractor; +import io.sharedstreets.matcher.ingest.input.json.JsonInputFormat; import io.sharedstreets.matcher.ingest.input.gpx.GpxInputFormat; import io.sharedstreets.matcher.ingest.model.Ingest; import io.sharedstreets.matcher.ingest.model.InputEvent; -import io.sharedstreets.matcher.ingest.util.TileId; import org.apache.commons.cli.*; import org.apache.flink.api.common.functions.FlatMapFunction; -import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.io.FileOutputFormat; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.util.Collector; +import org.apache.flink.core.fs.FileSystem; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -24,7 +22,25 @@ public class Ingester { - static Logger logger = LoggerFactory.getLogger(Ingester.class); + public enum FileType { + CSV("CSV"), + JSON("JSON"), + GPX("GPX"), + DCFHV("DCFHV"); + + private final String stringValue; + + FileType(String csv) { + stringValue = csv; + } + + @Override + public String toString() { + return stringValue; + } + } + + private static Logger logger = LoggerFactory.getLogger(Ingester.class); public static void main(String[] args) throws Exception { // create the command line parser @@ -33,88 +49,83 @@ public static void main(String[] args) throws Exception { // create the Options Options options = new Options(); - options.addOption( OptionBuilder.withLongOpt( "input" ) - .withDescription( "path to input files" ) + options.addOption(OptionBuilder.withLongOpt("input") + .withDescription("path to input files") .hasArg() .withArgName("INPUT-DIR") - .create() ); + .create()); - options.addOption( OptionBuilder.withLongOpt( "type" ) - .withDescription( "input type, supports: [CSV, JSON, GPX]" ) + options.addOption(OptionBuilder.withLongOpt("type") + .withDescription("input type, supports: [CSV, JSON, GPX]") .hasArg() .withArgName("INPUT-DIR") - .create() ); + .create()); - options.addOption( OptionBuilder.withLongOpt( "output" ) - .withDescription( "path to output (will be overwritten)" ) + options.addOption(OptionBuilder.withLongOpt("output") + .withDescription("path to output (will be overwritten)") .hasArg() .withArgName("OUTPUT-DIR") - .create() ); - - + .create()); options.addOption("speeds", "track GPS speed when available"); - options.addOption("verbose", "verbose error output" ); + options.addOption("verbose", "verbose error output"); String inputPath = ""; - String outputPath = ""; - String inputType = ""; boolean verbose = false; - - boolean gpsSpeeds = false; try { // parse the command line arguments - CommandLine line = parser.parse( options, args ); + CommandLine line = parser.parse(options, args); - - - if( line.hasOption( "input" ) ) { + if (line.hasOption("input")) { // print the value of block-size - inputPath = line.getOptionValue( "input" ); + inputPath = line.getOptionValue("input"); } - if( line.hasOption( "output" ) ) { + if (line.hasOption("output")) { // print the value of block-size - outputPath = line.getOptionValue( "output" ); + outputPath = line.getOptionValue("output"); } - if( line.hasOption( "speeds" ) ) { + if (line.hasOption("speeds")) { // print the value of block-size gpsSpeeds = true; } - if( line.hasOption( "verbose" ) ) { + if (line.hasOption("verbose")) { verbose = true; } - if( line.hasOption( "type" ) ) { + if (line.hasOption("type")) { // print the value of block-size - inputType = line.getOptionValue( "type" ).trim().toUpperCase(); - } - else { - String fileParts[] = inputPath.split("\\."); - if(fileParts[fileParts.length-1].toLowerCase().equals("csv")) - inputType = "CSV"; - else if(fileParts[fileParts.length-1].toLowerCase().equals("json")) - inputType = "JSON"; - else if(fileParts[fileParts.length-1].toLowerCase().equals("gpx")) - inputType = "GPX"; - else if(fileParts[fileParts.length-1].toLowerCase().equals("dcfhv")) - inputType = "DCFHV"; + inputType = line.getOptionValue("type").trim().toUpperCase(); + } else { + String[] fileParts = inputPath.split("\\."); + switch (fileParts[fileParts.length - 1].toLowerCase()) { + case "csv": + inputType = FileType.CSV.toString(); + break; + case "json": + inputType = FileType.JSON.toString(); + break; + case "gpx": + inputType = FileType.GPX.toString(); + break; + case "dcfhv": + inputType = FileType.DCFHV.toString(); + break; + } } - - } - catch( Exception exp ) { + } catch (Exception exp) { HelpFormatter formatter = new HelpFormatter(); - formatter.printHelp( "integster", options ); + formatter.printHelp("integster", options); - System.out.println( "Unexpected exception:" + exp.getMessage() ); + System.out.println("Unexpected exception:" + exp.getMessage()); return; } @@ -122,53 +133,46 @@ else if(fileParts[fileParts.length-1].toLowerCase().equals("dcfhv")) final boolean finalVerbose = verbose; // let's go... - logger.info("Starting up streams..."); - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - if(inputPath == null || inputPath.trim().isEmpty()) { + if (inputPath == null || inputPath.trim().isEmpty()) { HelpFormatter formatter = new HelpFormatter(); - formatter.printHelp( "integster", options ); + formatter.printHelp("integster", options); return; } // process events // step 1: read strings (blocks of location events from file) - DataSet inputEvents = null; - - if (finalInputType.equals("GPX")) { - inputEvents = env.createInput(new GpxInputFormat(inputPath, gpsSpeeds, finalVerbose)); - } - else { - DataSet inputStream = env.readTextFile(inputPath); - - // open text based file formats and map strings to extractor methods - inputEvents = inputStream.flatMap(new FlatMapFunction() { - - @Override - public void flatMap(String value, Collector out) throws Exception { - - if (finalInputType.equals("CSV")) { - List inputEvents = CsvEventExtractor.extractEvents(value, finalVerbose); - - for (InputEvent inputEvent : inputEvents) - out.collect(inputEvent); - - } else if (finalInputType.equals("JSON")) { - List inputEvents = JsonEventExtractor.extractEvents(value, finalVerbose); - - for (InputEvent inputEvent : inputEvents) + DataSet inputEvents; + + switch (FileType.valueOf(finalInputType)) { + case GPX: + inputEvents = env.createInput(new GpxInputFormat(inputPath, gpsSpeeds, finalVerbose)); + break; + case JSON: + inputEvents = env.createInput(new JsonInputFormat(new org.apache.flink.core.fs.Path(inputPath))); + break; + default: + DataSet inputStream = env.readTextFile(inputPath); + + // open text based file formats and map strings to extractor methods + inputEvents = inputStream.flatMap((FlatMapFunction) (value, out) -> { + if (finalInputType.equals(FileType.CSV.toString())) { + List csvInputEvents = CsvEventExtractor.extractEvents(value, finalVerbose); + + for (InputEvent inputEvent : csvInputEvents) { out.collect(inputEvent); - } else if (finalInputType.equals("DCFHV")) { - List inputEvents = DcfhvEventExtractor.extractEvents(value, finalVerbose); + } + } else if (finalInputType.equals(FileType.DCFHV.toString())) { + List dcfhvInputEvents = DcfhvEventExtractor.extractEvents(value, finalVerbose); - for (InputEvent inputEvent : inputEvents) + for (InputEvent inputEvent : dcfhvInputEvents) { out.collect(inputEvent); + } } - } - }); + }); } // // create list of map tiles for input traces @@ -202,9 +206,8 @@ public void flatMap(String value, Collector out) throws Exception { // Path dataPath = Paths.get(outputPath, "event_data").toAbsolutePath(); - if(dataPath.toFile().exists()) { - System.out.print("File already exists: " + outputPath.toString()); - return; + if (dataPath.toFile().exists()) { + System.out.print("File already exists: " + outputPath + " \n...Overwriting\n"); } // write protobuf of traces @@ -214,9 +217,8 @@ public void writeRecord(InputEvent record) throws IOException { Ingest.InputEventProto proto = record.toProto(); proto.writeDelimitedTo(this.stream); } - }, dataPath.toString()).setParallelism(1); + }, dataPath.toString(), FileSystem.WriteMode.OVERWRITE).setParallelism(1); env.execute("process"); - } } diff --git a/ingest/src/main/java/io/sharedstreets/matcher/ingest/input/JsonEventExtractor.java b/ingest/src/main/java/io/sharedstreets/matcher/ingest/input/JsonEventExtractor.java deleted file mode 100644 index 17a2df5..0000000 --- a/ingest/src/main/java/io/sharedstreets/matcher/ingest/input/JsonEventExtractor.java +++ /dev/null @@ -1,69 +0,0 @@ -package io.sharedstreets.matcher.ingest.input; - - -import com.jsoniter.JsonIterator; -import io.sharedstreets.matcher.ingest.model.Point; -import io.sharedstreets.matcher.ingest.model.InputEvent; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; - -public class JsonEventExtractor { - - static Logger logger = LoggerFactory.getLogger(JsonEventExtractor.class); - - public static List extractEvents(String value, boolean verbose) throws IOException { - - ArrayList list = new ArrayList(); - - try { - - InputEvent event = new InputEvent(); - event.point = new Point(); - - JsonIterator iter = JsonIterator.parse(value); - - String eventType = null; - Double eventValue = null; - - for (String field = iter.readObject(); field != null; field = iter.readObject()) { - - if(field.equals("timestamp")) - event.time = iter.readLong() * 1000; - else if(field.equals("longitude")) - event.point.lon = Double.parseDouble(iter.readString()); - else if(field.equals("latitude")) - event.point.lat = Double.parseDouble(iter.readString()); - else if(field.equals("id")) - event.vehicleId = iter.readString(); - else if(field.equals("eventType")) - eventType = iter.readString(); - else if(field.equals("eventValue")) - eventValue = iter.readDouble(); - else - iter.read(); // no-op - } - - if(eventType != null && !eventType.trim().equals("")) { - event.eventData = new HashMap<>(); - event.eventData.put(eventType, eventValue); - } - - // single event per line - // TODO add grouped trace in JSON format - - list.add(event); - } - catch(Exception e) { - - logger.error("Unable to parse line: " + value); - - } - - return list; - } -} diff --git a/ingest/src/main/java/io/sharedstreets/matcher/ingest/input/json/JsonInputFormat.java b/ingest/src/main/java/io/sharedstreets/matcher/ingest/input/json/JsonInputFormat.java new file mode 100644 index 0000000..6688df0 --- /dev/null +++ b/ingest/src/main/java/io/sharedstreets/matcher/ingest/input/json/JsonInputFormat.java @@ -0,0 +1,92 @@ +package io.sharedstreets.matcher.ingest.input.json; + +import io.sharedstreets.matcher.ingest.model.InputEvent; +import io.sharedstreets.matcher.ingest.model.JsonInputObject; +import io.sharedstreets.matcher.ingest.model.Point; +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.fs.local.LocalFileStatus; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +public class JsonInputFormat extends FileInputFormat { + + private final AtomicInteger numEvents = new AtomicInteger(0); + private final AtomicInteger currentEventIndex = new AtomicInteger(0); + private transient JsonInputObject parsedFileContents = new JsonInputObject(); + + public JsonInputFormat(Path filePath) { + super(filePath); + } + + @Override + public FileInputSplit[] createInputSplits(int minNumSplits) throws IOException { + if (minNumSplits < 1) { + throw new IllegalArgumentException("Number of input splits has to be at least 1."); + } + + // take the desired number of splits into account + minNumSplits = Math.max(minNumSplits, this.numSplits); + + final List inputSplits = new ArrayList<>(minNumSplits); + final FileSystem fs = filePath.getFileSystem(); + File dir = new File(filePath.getPath()); + File[] files = dir.listFiles(); + + if (files != null) { + int splitNum = 0; + + for (File f : files) { + FileStatus file = new LocalFileStatus(f, fs); + FileInputSplit split = new FileInputSplit(splitNum++, file.getPath(), 0, file.getLen(), null); + inputSplits.add(split); + } + } + + return inputSplits.toArray(new FileInputSplit[0]); + } + + @Override + public void open(FileInputSplit fileSplit) throws IOException { + super.open(fileSplit); + String pathToFile = filePath + "/" + currentSplit.getPath().getName(); + + JsonInputObject result = JsonParser.parseJson(pathToFile); + + if (result != null) { + parsedFileContents = result; + numEvents.compareAndSet(0, result.eventData.size()); + } + } + + @Override + public boolean reachedEnd() { + return currentEventIndex.get() >= numEvents.get(); + } + + @Override + public InputEvent nextRecord(InputEvent reuse) { + int index = incrementEventIndex(); + + reuse.vehicleId = parsedFileContents.eventData.get(index).vehicleId; + reuse.time = parsedFileContents.eventData.get(index).timeStamp; + reuse.point = new Point(parsedFileContents.eventData.get(index).longitude, parsedFileContents.eventData.get(index).latitude); + + if (parsedFileContents.eventData.get(index).eventType != null) { + reuse.eventData = parsedFileContents.eventData.get(index).eventType; + } + + return reuse; + } + + private synchronized int incrementEventIndex() { + return currentEventIndex.getAndIncrement(); + } +} diff --git a/ingest/src/main/java/io/sharedstreets/matcher/ingest/input/json/JsonParser.java b/ingest/src/main/java/io/sharedstreets/matcher/ingest/input/json/JsonParser.java new file mode 100644 index 0000000..05300a9 --- /dev/null +++ b/ingest/src/main/java/io/sharedstreets/matcher/ingest/input/json/JsonParser.java @@ -0,0 +1,39 @@ +package io.sharedstreets.matcher.ingest.input.json; + +import com.google.gson.Gson; +import io.sharedstreets.matcher.ingest.model.JsonInputObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.*; + +class JsonParser { + + private static Logger logger = LoggerFactory.getLogger(JsonParser.class); + + static JsonInputObject parseJson(final String pathToFile) { + if (pathToFile == null || pathToFile.isEmpty()) { + logger.error("Path is empty or null when parsing Json File"); + return null; + } + JsonInputObject result = null; + + try (BufferedReader reader = new BufferedReader(new FileReader(new File(pathToFile)))) { + + Gson gson = new Gson(); + StringBuilder builder = new StringBuilder(); + String line; + + while ((line = reader.readLine()) != null) { + builder.append(line); + } + reader.close(); + + result = gson.fromJson(builder.toString(), JsonInputObject.class); + } catch (IOException e) { + e.printStackTrace(); + } + + return result; + } +} diff --git a/ingest/src/main/java/io/sharedstreets/matcher/ingest/model/JsonInputObject.java b/ingest/src/main/java/io/sharedstreets/matcher/ingest/model/JsonInputObject.java new file mode 100644 index 0000000..c2666d7 --- /dev/null +++ b/ingest/src/main/java/io/sharedstreets/matcher/ingest/model/JsonInputObject.java @@ -0,0 +1,30 @@ +package io.sharedstreets.matcher.ingest.model; + +import com.google.gson.annotations.SerializedName; + +import java.util.HashMap; +import java.util.List; + +public class JsonInputObject { + + @SerializedName("eventData") + public List eventData; + + public class JsonEventObject { + + @SerializedName("vehicleId") + public String vehicleId; + + @SerializedName("timeStamp") + public Long timeStamp; + + @SerializedName("latitude") + public double latitude; + + @SerializedName("longitude") + public double longitude; + + @SerializedName("eventType") + public HashMap eventType; + } +} diff --git a/sample_data/json/json_trace1.json b/sample_data/json/json_trace1.json new file mode 100644 index 0000000..8721958 --- /dev/null +++ b/sample_data/json/json_trace1.json @@ -0,0 +1,49 @@ +{ + "eventData": [ + { + "vehicleId":"test_trace", + "timeStamp":1550021697198, + "latitude":32.981278, + "longitude":-96.70976413, + "eventType": { + "gpsSpeed":12.310797 + } + }, + { + "vehicleId":"test_trace", + "timeStamp":1550021707198, + "latitude":32.98110025, + "longitude":-96.709954083, + "eventType": { + "gpsSpeed":11.960151 + } + }, + { + "vehicleId":"test_trace", + "timeStamp":1550021717198, + "latitude":32.98100383, + "longitude":-96.71010795, + "eventType": { + "gpsSpeed":9.841646 + } + }, + { + "vehicleId":"test_trace", + "timeStamp":1550021727198, + "latitude":32.98092837, + "longitude":-96.71021082, + "eventType": { + "gpsSpeed":5.802207 + } + }, + { + "vehicleId":"test_trace", + "timeStamp":1550021757198, + "latitude":32.98087908, + "longitude":-96.71030881, + "eventType": { + "gpsSpeed":3.620221 + } + } + ] +} \ No newline at end of file diff --git a/sample_data/json/json_trace2.json b/sample_data/json/json_trace2.json new file mode 100644 index 0000000..352b267 --- /dev/null +++ b/sample_data/json/json_trace2.json @@ -0,0 +1,40 @@ +{ + "eventData": [ + { + "vehicleId":"test_trace", + "timeStamp":1550021697198, + "latitude":32.981278, + "longitude":-96.70976413, + "eventType": { + "PICKUP":0.0 + } + }, + { + "vehicleId":"test_trace", + "timeStamp":1550021707198, + "latitude":32.98110025, + "longitude":-96.709954083 + }, + { + "vehicleId":"test_trace", + "timeStamp":1550021717198, + "latitude":32.98100383, + "longitude":-96.71010795 + }, + { + "vehicleId":"test_trace", + "timeStamp":1550021727198, + "latitude":32.98092837, + "longitude":-96.71021082 + }, + { + "vehicleId":"test_trace", + "timeStamp":1550021757198, + "latitude":32.98087908, + "longitude":-96.71030881, + "eventType": { + "DROPOFF":0.0 + } + } + ] +} \ No newline at end of file