diff --git a/examples/fileIO/README.md b/examples/fileIO/README.md new file mode 100644 index 0000000000..651f42557b --- /dev/null +++ b/examples/fileIO/README.md @@ -0,0 +1,74 @@ +## Sample application for File Read-Write implementation + +This example shows how to extend the `AbstractFileInputOperator` and `AbstractFileOutputOperator` from [Malhar library](https://github.com/apache/apex-malhar) to create a high performance application to copy files. There are two examples in the project: + +1. FileIO: This application copies text file contents line by like to specified destination. The required properties for application are specified in src/main/resources/META-INF/properties-FileIO.xml. + +2. ThroughputBasedFileIO: This application copies file contents block by block to specified destination. This application makes use of `AbstractThroughputFileInputOperator` from [Malhar library](https://github.com/apache/apex-malhar). The required properties for application are specified in src/main/resources/META-INF/properties-ThroughputBasedFileIO.xml. + +#### Follow these steps to run this application + +**Step 1**: Update input and output file(s)/folder properties in your application properties file. + +For Input location update property `dt.operator.read.prop.directory` + +For Output location update property `dt.operator.write.prop.filePath` + +**Step 2**: Build the code: + + shell> mvn clean install + +Upload the `target/fileIO-1.0-SNAPSHOT.apa` to the UI console if available or launch it from the commandline using `apexcli`. + +**Step 3**: During launch use `src/main/resources/META-INF/properties-*.xml` as a custom configuration file; then verify +that the output directory has the expected output. + +**Note**: The application can be run in local mode within your IDE by simply running tests available in `src/test/java` folder. + +## Sample application for Multi Directory File Read-Write Implementation + +This example is very similar to the fileIO example with one difference: it shows how +create a set of partitions separated into slices where each slice monitors a different +input directory. A custom partitioner and directory scanner are used. + +## Sample application for File Input & output operators + +Sample application to show how to use the file input and output operators. + +During a typical run on a Hadoop cluster, when input files are dropped into the +configured input directory (e.g. `/tmp/SimpleFileIO/input-dir`), the application +will create temporary files like this at the configured output location in +HDFS (e.g. `/tmp/SimpleFileIO/output-dir`) and copy all input file data to it: + + /tmp/SimpleFileIO/output-dir/myfile_p2.0.1465929407447.tmp + +When the file size exceeds the configured limit of 100000 bytes, a new file with +a name like `myfile_p2.1.1465929407447.tmp` will be opened and, a minute or two +later, the old file will be renamed to `myfile_p2.0`. + +## Sample application for file output operator with partitioning and rolling file output. + +Sample application to show how to use the file output operator along with +partitioning and rolling file output. + +A typical run on a Hadoop cluster will create files like this at the configured +output location in HDFS (e.g. `/tmp/fileOutput`) where the numeric extension is +the sequnce number of rolling output files and the number following 'p' is the +operator id of the partition that generated the file: + + /tmp/fileOutput/sequence_p3.0 + /tmp/fileOutput/sequence_p3.1 + /tmp/fileOutput/sequence_p4.0 + /tmp/fileOutput/sequence_p4.1 + +Each file should contain lines like this where the second value is the number +produced by the generator operator and the first is the corresponding operator id: + + [1, 1075] + [1, 1095] + [2, 1110] + [2, 1120] + +Please note that there are no guarantees about the way operator ids are assigned +to operators by the platform. + diff --git a/examples/fileIO/pom.xml b/examples/fileIO/pom.xml new file mode 100644 index 0000000000..e069acc6f3 --- /dev/null +++ b/examples/fileIO/pom.xml @@ -0,0 +1,51 @@ + + + + 4.0.0 + + + org.apache.apex + malhar-examples + 3.8.0-SNAPSHOT + + + malhar-examples-fileIO + jar + + + + + FileIO examples + Demostrates the FileIO related examples + + + + + org.apache.apex + apex-engine + ${apex.core.version} + test + + + + diff --git a/examples/fileIO/src/assemble/appPackage.xml b/examples/fileIO/src/assemble/appPackage.xml new file mode 100644 index 0000000000..a8708074a2 --- /dev/null +++ b/examples/fileIO/src/assemble/appPackage.xml @@ -0,0 +1,63 @@ + + + appPackage + + jar + + false + + + ${basedir}/target/ + /app + + ${project.artifactId}-${project.version}.jar + + + + ${basedir}/target/deps + /lib + + + ${basedir}/src/site/conf + /conf + + *.xml + + + + ${basedir}/src/main/resources/META-INF + /META-INF + + + ${basedir}/src/main/resources/app + /app + + + ${basedir}/src/main/resources/resources + /resources + + + + + diff --git a/examples/fileIO/src/main/java/org/apache/apex/examples/fileIO/Application.java b/examples/fileIO/src/main/java/org/apache/apex/examples/fileIO/Application.java new file mode 100644 index 0000000000..1831767f5d --- /dev/null +++ b/examples/fileIO/src/main/java/org/apache/apex/examples/fileIO/Application.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.apex.examples.fileIO; + +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.DAG; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.annotation.ApplicationAnnotation; + +import static com.datatorrent.api.Context.PortContext.PARTITION_PARALLEL; + +@ApplicationAnnotation(name = "FileIO") +public class Application implements StreamingApplication +{ + + @Override + public void populateDAG(DAG dag, Configuration conf) + { + // create operators + FileReader reader = dag.addOperator("read", FileReader.class); + FileWriter writer = dag.addOperator("write", FileWriter.class); + + // using parallel partitioning ensures that lines from a single file are handled + // by the same writer + // + dag.setInputPortAttribute(writer.input, PARTITION_PARALLEL, true); + dag.setInputPortAttribute(writer.control, PARTITION_PARALLEL, true); + + dag.addStream("data", reader.output, writer.input); + dag.addStream("ctrl", reader.control, writer.control); + } +} diff --git a/examples/fileIO/src/main/java/org/apache/apex/examples/fileIO/BytesFileWriter.java b/examples/fileIO/src/main/java/org/apache/apex/examples/fileIO/BytesFileWriter.java new file mode 100644 index 0000000000..6484c1e80a --- /dev/null +++ b/examples/fileIO/src/main/java/org/apache/apex/examples/fileIO/BytesFileWriter.java @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.apex.examples.fileIO; + +import java.util.ArrayList; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.lib.io.fs.AbstractFileOutputOperator; + +public class BytesFileWriter extends AbstractFileOutputOperator +{ + private static final transient Logger LOG = LoggerFactory.getLogger(BytesFileWriter.class); + private static final char START_FILE = ThroughputBasedReader.START_FILE; + private static final char FINISH_FILE = ThroughputBasedReader.FINISH_FILE; + private String fileName; // current file name + private boolean eof; + private transient ArrayList savedLines = new ArrayList<>(); + + public final transient DefaultInputPort control = new DefaultInputPort() + { + @Override + public void process(String tuple) + { + processControlTuple(tuple); + } + }; + + private void processControlTuple(final String tuple) + { + if (START_FILE == tuple.charAt(0)) { + // sanity check + if (null != fileName) { + throw new RuntimeException(String.format("Error: fileName = %s, expected null", fileName)); + } + + fileName = tuple.substring(1); + if (!savedLines.isEmpty()) { + LOG.debug("Processing {} saved lines", savedLines.size()); + for (byte[] line : savedLines) { + processTuple(line); + } + savedLines.clear(); + } + return; + } + + final int last = tuple.length() - 1; + if (FINISH_FILE == tuple.charAt(last)) { // end of file + String name = tuple.substring(0, last); + LOG.info("Closing file: " + name); + if (null == fileName || !fileName.equals(name)) { + throw new RuntimeException(String.format("Error: fileName = %s != %s = tuple", fileName, tuple)); + } + eof = true; + return; + } + } + + @Override + public void endWindow() + { + if (!eof) { + return; + } + + // got an EOF, so must have a file name + if (null == fileName) { + throw new RuntimeException("Error: fileName empty"); + } + requestFinalize(fileName); + super.endWindow(); + + eof = false; + fileName = null; + } + + @Override + protected String getFileName(byte[] tuple) + { + return fileName; + } + + @Override + protected byte[] getBytesForTuple(byte[] tuple) + { + return tuple; + } + + @Override + public void processTuple(byte[] tuple) + { + if (null == fileName) { + savedLines.add(tuple); + return; + } + super.processTuple(tuple); + } + +} diff --git a/examples/fileIO/src/main/java/org/apache/apex/examples/fileIO/FileReader.java b/examples/fileIO/src/main/java/org/apache/apex/examples/fileIO/FileReader.java new file mode 100644 index 0000000000..f38bacf5e7 --- /dev/null +++ b/examples/fileIO/src/main/java/org/apache/apex/examples/fileIO/FileReader.java @@ -0,0 +1,162 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.apex.examples.fileIO; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.fs.Path; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.annotation.OutputPortFieldAnnotation; +import com.datatorrent.lib.io.fs.AbstractFileInputOperator; + +/** + * read lines from input file and emit them on output port; if end-of-file is reached, a control tuple + * is emitted on the control port + */ +public class FileReader extends AbstractFileInputOperator +{ + private static final Logger LOG = LoggerFactory.getLogger(FileReader.class); + + /** + * prefix for file start and finish control tuples + */ + public static final char START_FILE = '('; + public static final char FINISH_FILE = ')'; + + /** + * output port for file data + */ + @OutputPortFieldAnnotation(optional = false) + public final transient DefaultOutputPort output = new DefaultOutputPort<>(); + + /** + * output port for control data + */ + @OutputPortFieldAnnotation(optional = false) + public final transient DefaultOutputPort control = new DefaultOutputPort<>(); + + private transient BufferedReader br = null; + + // Path is not serializable so convert to/from string for persistance + private transient Path filePath; + private String filePathStr; + + // set to true when end-of-file occurs, to prevent emission of addditional tuples in current window + private boolean stop; + + // pause for this many milliseconds after end-of-file + private transient int pauseTime; + + @Override + public void setup(Context.OperatorContext context) + { + super.setup(context); + + pauseTime = context.getValue(Context.OperatorContext.SPIN_MILLIS); + + if (null != filePathStr) { // restarting from checkpoint + filePath = new Path(filePathStr); + } + } + + @Override + public void endWindow() + { + super.endWindow(); + stop = false; + } + + @Override + public void emitTuples() + { + if (!stop) { // normal processing + super.emitTuples(); + return; + } + + // we have end-of-file, so emit no further tuples till next window; relax for a bit + try { + Thread.sleep(pauseTime); + } catch (InterruptedException e) { + LOG.info("Sleep interrupted"); + } + } + + @Override + protected InputStream openFile(Path curPath) throws IOException + { + LOG.debug("openFile: curPath = {}", curPath); + filePath = curPath; + filePathStr = filePath.toString(); + + // new file started, send control tuple on control port + control.emit(START_FILE + filePath.getName()); + + InputStream is = super.openFile(filePath); + br = new BufferedReader(new InputStreamReader(is)); + return is; + } + + @Override + protected void closeFile(InputStream is) throws IOException + { + LOG.debug("closeFile: filePath = {}", filePath); + super.closeFile(is); + + // reached end-of-file, send control tuple on control port + control.emit(filePath.getName() + FINISH_FILE); + + br.close(); + br = null; + filePath = null; + filePathStr = null; + stop = true; + } + + @Override + protected String readEntity() throws IOException + { + // try to read a line + final String line = br.readLine(); + if (null != line) { // normal case + LOG.debug("readEntity: line = {}", line); + return line; + } + + // end-of-file (control tuple sent in closeFile() + LOG.info("readEntity: EOF for {}", filePath); + return null; + } + + @Override + protected void emit(String tuple) + { + output.emit(tuple); + } + +} diff --git a/examples/fileIO/src/main/java/org/apache/apex/examples/fileIO/FileWriter.java b/examples/fileIO/src/main/java/org/apache/apex/examples/fileIO/FileWriter.java new file mode 100644 index 0000000000..0e0c1d18cc --- /dev/null +++ b/examples/fileIO/src/main/java/org/apache/apex/examples/fileIO/FileWriter.java @@ -0,0 +1,154 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.apex.examples.fileIO; + +import java.util.ArrayList; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.lib.io.fs.AbstractFileOutputOperator; + +/** + * Write incoming line to output file + */ +public class FileWriter extends AbstractFileOutputOperator +{ + private static final Logger LOG = LoggerFactory.getLogger(FileWriter.class); + private static final String CHARSET_NAME = "UTF-8"; + private static final String NL = System.lineSeparator(); + private static final char START_FILE = FileReader.START_FILE; + private static final char FINISH_FILE = FileReader.FINISH_FILE; + + private String fileName; // current file name + + private boolean eof; + + // lines that arrive before the start control tuple are saved here + private ArrayList savedLines = new ArrayList<>(); + + /** + * control port for file start/finish control tuples + */ + public final transient DefaultInputPort control = new DefaultInputPort() + { + @Override + public void process(String tuple) + { + processControlTuple(tuple); + } + }; + + private void processControlTuple(final String tuple) + { + if (START_FILE == tuple.charAt(0)) { // start of file + LOG.debug("start tuple = {}", tuple); + + // sanity check + if (null != fileName) { + throw new RuntimeException(String.format("Error: fileName = %s, expected null", fileName)); + } + + fileName = tuple.substring(1); + + // if we have saved lines, process them + if (!savedLines.isEmpty()) { + LOG.debug("Processing {} saved lines", savedLines.size()); + for (String line : savedLines) { + processTuple(line); + } + savedLines.clear(); + } + + return; + } + + final int last = tuple.length() - 1; + if (FINISH_FILE == tuple.charAt(last)) { // end of file + LOG.debug("finish tuple = {}", tuple); + String name = tuple.substring(0, last); + + // sanity check : should match what we got with start control tuple + if (null == fileName || !fileName.equals(name)) { + throw new RuntimeException(String.format("Error: fileName = %s != %s = tuple", fileName, tuple)); + } + + eof = true; + return; + } + + // should never happen + throw new RuntimeException("Error: Bad control tuple: {}" + tuple); + } + + @Override + public void processTuple(String tuple) + { + if (null == fileName) { + savedLines.add(tuple); + return; + } + + super.processTuple(tuple); + } + + @Override + public void endWindow() + { + if (!eof) { + return; + } + + // got an EOF, so must have a file name + if (null == fileName) { + throw new RuntimeException("Error: fileName empty"); + } + + LOG.info("requesting finalize of {}", fileName); + requestFinalize(fileName); + super.endWindow(); + + eof = false; + fileName = null; + } + + @Override + protected String getFileName(String tuple) + { + return fileName; + } + + @Override + protected byte[] getBytesForTuple(String line) + { + LOG.debug("getBytesForTuple: line.length = {}", line.length()); + + byte[] result = null; + try { + result = (line + NL).getBytes(CHARSET_NAME); + } catch (Exception e) { + LOG.info("Error: got exception {}", e); + throw new RuntimeException(e); + } + return result; + } + +} diff --git a/examples/fileIO/src/main/java/org/apache/apex/examples/fileIO/ThroughputBasedApplication.java b/examples/fileIO/src/main/java/org/apache/apex/examples/fileIO/ThroughputBasedApplication.java new file mode 100644 index 0000000000..001e9c939b --- /dev/null +++ b/examples/fileIO/src/main/java/org/apache/apex/examples/fileIO/ThroughputBasedApplication.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.apex.examples.fileIO; + +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.DAG; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.annotation.ApplicationAnnotation; + +import static com.datatorrent.api.Context.PortContext.PARTITION_PARALLEL; + +@ApplicationAnnotation(name = "ThroughputBasedFileIO") +public class ThroughputBasedApplication implements StreamingApplication +{ + + @Override + public void populateDAG(DAG dag, Configuration conf) + { + ThroughputBasedReader reader = dag.addOperator("read", ThroughputBasedReader.class); + BytesFileWriter writer = dag.addOperator("write", BytesFileWriter.class); + + dag.setInputPortAttribute(writer.input, PARTITION_PARALLEL, true); + dag.setInputPortAttribute(writer.control, PARTITION_PARALLEL, true); + + dag.addStream("data", reader.output, writer.input); + dag.addStream("ctrl", reader.control, writer.control); + } + +} diff --git a/examples/fileIO/src/main/java/org/apache/apex/examples/fileIO/ThroughputBasedReader.java b/examples/fileIO/src/main/java/org/apache/apex/examples/fileIO/ThroughputBasedReader.java new file mode 100644 index 0000000000..c8fb9c4aa2 --- /dev/null +++ b/examples/fileIO/src/main/java/org/apache/apex/examples/fileIO/ThroughputBasedReader.java @@ -0,0 +1,160 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.apex.examples.fileIO; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.util.Arrays; + +import org.apache.hadoop.fs.Path; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.annotation.OutputPortFieldAnnotation; +import com.datatorrent.lib.io.fs.AbstractThroughputFileInputOperator; + +public class ThroughputBasedReader extends AbstractThroughputFileInputOperator +{ + /** + * prefix for file start and finish control tuples + */ + public static final char START_FILE = '('; + public static final char FINISH_FILE = ')'; + private static final int DEFAULT_BLOCK_SIZE = 1024 * 64; + + @OutputPortFieldAnnotation(optional = false) + public final transient DefaultOutputPort output = new DefaultOutputPort<>(); + @OutputPortFieldAnnotation(optional = false) + public final transient DefaultOutputPort control = new DefaultOutputPort<>(); + + private boolean endOfFile; + private transient int pauseTime; + private int blockSize; + private int blockThreshold; + private int blocksReadInWindow; + + @Override + public void setup(Context.OperatorContext context) + { + if (blockSize == 0) { + blockSize = DEFAULT_BLOCK_SIZE; + } + if (blockThreshold == 0) { + blockThreshold = 2; + } + super.setup(context); + pauseTime = context.getValue(Context.OperatorContext.SPIN_MILLIS); + } + + @Override + public void beginWindow(long windowId) + { + super.beginWindow(windowId); + blocksReadInWindow = 0; + } + + @Override + public void endWindow() + { + super.endWindow(); + endOfFile = false; + } + + @Override + public void emitTuples() + { + if (blocksReadInWindow >= blockThreshold) { + try { + Thread.sleep(pauseTime); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + return; + } + if (!endOfFile) { + super.emitTuples(); + return; + } + } + + @Override + protected InputStream openFile(Path path) throws IOException + { + control.emit(START_FILE + path.getName()); + return super.openFile(path); + } + + @Override + protected void closeFile(InputStream is) throws IOException + { + endOfFile = true; + control.emit(new File(currentFile).getName() + FINISH_FILE); + super.closeFile(is); + } + + @Override + protected byte[] readEntity() throws IOException + { + byte[] block = new byte[blockSize]; + int bytesRead = inputStream.read(block, 0, blockSize); + if (bytesRead <= 0) { + return null; + } + blocksReadInWindow++; + return Arrays.copyOf(block, bytesRead); + } + + @Override + protected void emit(byte[] tuple) + { + output.emit(tuple); + } + + public int getBlockSize() + { + return blockSize; + } + + public void setBlockSize(int blockSize) + { + this.blockSize = blockSize; + } + + /** + * Gets number of blocks to emit per window + * + * @return + */ + public int getBlockThreshold() + { + return blockThreshold; + } + + /** + * Sets number of blocks to emit per window + * + * @param blockThreshold + */ + public void setBlockThreshold(int blockThreshold) + { + this.blockThreshold = blockThreshold; + } +} diff --git a/examples/fileIO/src/main/java/org/apache/apex/examples/fileIOMultiDir/Application.java b/examples/fileIO/src/main/java/org/apache/apex/examples/fileIOMultiDir/Application.java new file mode 100644 index 0000000000..8da1c95d60 --- /dev/null +++ b/examples/fileIO/src/main/java/org/apache/apex/examples/fileIOMultiDir/Application.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.apex.examples.fileIOMultiDir; + +import org.apache.hadoop.conf.Configuration; +import com.datatorrent.api.DAG; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.annotation.ApplicationAnnotation; + +import static com.datatorrent.api.Context.PortContext.PARTITION_PARALLEL; + +@ApplicationAnnotation(name = "FileIO") +public class Application implements StreamingApplication +{ + + @Override + public void populateDAG(DAG dag, Configuration conf) + { + // create operators + FileReader reader = dag.addOperator("read", FileReader.class); + FileWriter writer = dag.addOperator("write", FileWriter.class); + + reader.setScanner(new FileReaderMultiDir.SlicedDirectoryScanner()); + + // using parallel partitioning ensures that lines from a single file are handled + // by the same writer + // + dag.setInputPortAttribute(writer.input, PARTITION_PARALLEL, true); + dag.setInputPortAttribute(writer.control, PARTITION_PARALLEL, true); + + dag.addStream("data", reader.output, writer.input); + dag.addStream("ctrl", reader.control, writer.control); + } +} diff --git a/examples/fileIO/src/main/java/org/apache/apex/examples/fileIOMultiDir/FileReader.java b/examples/fileIO/src/main/java/org/apache/apex/examples/fileIOMultiDir/FileReader.java new file mode 100644 index 0000000000..2f790b81da --- /dev/null +++ b/examples/fileIO/src/main/java/org/apache/apex/examples/fileIOMultiDir/FileReader.java @@ -0,0 +1,166 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.apex.examples.fileIOMultiDir; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.fs.Path; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.annotation.OutputPortFieldAnnotation; + + +/** + * read lines from input file and emit them on output port; a begin-file control tuple + * is emitted when a file is opened and an end-file control tuple when EOF is reached + */ +public class FileReader extends FileReaderMultiDir +{ + private static final Logger LOG = LoggerFactory.getLogger(FileReader.class); + + /** + * prefix for file start and finish control tuples + */ + public static final char START_FILE = '('; + public static final char FINISH_FILE = ')'; + + /** + * output port for file data + */ + @OutputPortFieldAnnotation(optional = false) + public final transient DefaultOutputPort output = new DefaultOutputPort<>(); + + /** + * output port for control data + */ + @OutputPortFieldAnnotation(optional = false) + public final transient DefaultOutputPort control = new DefaultOutputPort<>(); + + private transient BufferedReader br = null; + + // Path is not serializable so convert to/from string for persistance + private transient Path filePath; + private String filePathStr; + + // set to true when end-of-file occurs, to prevent emission of addditional tuples in current window + private boolean stop; + + // pause for this many milliseconds after end-of-file + private transient int pauseTime; + + @Override + public void setup(Context.OperatorContext context) + { + super.setup(context); + SlicedDirectoryScanner sds = (SlicedDirectoryScanner)getScanner(); + LOG.info("setup: directory = {}; scanner: " + + "(startIndex = {}, endIndex = {}, pIndex = {})", getDirectory(), + sds.getStartIndex(), sds.getEndIndex(), sds.getpIndex()); + + pauseTime = context.getValue(Context.OperatorContext.SPIN_MILLIS); + + if (null != filePathStr) { // restarting from checkpoint + filePath = new Path(filePathStr); + } + } + + @Override + public void endWindow() + { + super.endWindow(); + stop = false; + } + + @Override + public void emitTuples() + { + if (!stop) { // normal processing + super.emitTuples(); + return; + } + + // we have end-of-file, so emit no further tuples till next window; relax for a bit + try { + Thread.sleep(pauseTime); + } catch (InterruptedException e) { + LOG.info("Sleep interrupted"); + } + } + + @Override + protected InputStream openFile(Path curPath) throws IOException + { + LOG.debug("openFile: curPath = {}", curPath); + filePath = curPath; + filePathStr = filePath.toString(); + + // new file started, send control tuple on control port + control.emit(START_FILE + filePath.getName()); + + InputStream is = super.openFile(filePath); + br = new BufferedReader(new InputStreamReader(is)); + return is; + } + + @Override + protected void closeFile(InputStream is) throws IOException + { + LOG.debug("closeFile: filePath = {}", filePath); + super.closeFile(is); + + // reached end-of-file, send control tuple on control port + control.emit(filePath.getName() + FINISH_FILE); + + br.close(); + br = null; + filePath = null; + filePathStr = null; + stop = true; + } + + @Override + protected String readEntity() throws IOException + { + // try to read a line + final String line = br.readLine(); + if (null != line) { // normal case + LOG.debug("readEntity: line = {}", line); + return line; + } + + // end-of-file (control tuple sent in closeFile() + LOG.info("readEntity: EOF for {}", filePath); + return null; + } + + @Override + protected void emit(String tuple) + { + output.emit(tuple); + } + +} diff --git a/examples/fileIO/src/main/java/org/apache/apex/examples/fileIOMultiDir/FileReaderMultiDir.java b/examples/fileIO/src/main/java/org/apache/apex/examples/fileIOMultiDir/FileReaderMultiDir.java new file mode 100644 index 0000000000..bea75351a6 --- /dev/null +++ b/examples/fileIO/src/main/java/org/apache/apex/examples/fileIOMultiDir/FileReaderMultiDir.java @@ -0,0 +1,321 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.apex.examples.fileIOMultiDir; + +import java.io.ByteArrayOutputStream; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import javax.validation.constraints.NotNull; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.lib.wal.WindowDataManager; +import org.apache.commons.io.IOUtils; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; + +import com.datatorrent.api.DefaultPartition; +import com.datatorrent.lib.io.fs.AbstractFileInputOperator; + +public abstract class FileReaderMultiDir extends AbstractFileInputOperator +{ + // parallel arrays: directories[i] is monitored by partitionCounts[i] partitions + // which should be contiguous in the list of partitions. + // + @NotNull + private String[] directories; + @NotNull + private int[] partitionCounts; + + /** + * List of monitored directories + */ + public String[] getDirectories() + { + return directories; + } + + public void setDirectories(String[] v) + { + directories = v; + } + + /** + * Partition count for each monitored directory + */ + public int[] getPartitionCounts() + { + return partitionCounts; + } + + public void setPartitionCounts(int[] v) + { + partitionCounts = v; + } + + /** + * Clone object by serializing and deserializing using Kryo. + * Note this is different from using {@link Kryo#copy(Object)}, which will attempt to also clone transient fields. + * (from KryoCloneUtils.java) + * + * @param kryo kryo object used to clone objects + * @param src src object that copy from + * @return + */ + @SuppressWarnings("unchecked") + public static SRC cloneObject(Kryo kryo, SRC src) + { + kryo.setClassLoader(src.getClass().getClassLoader()); + ByteArrayOutputStream bos = null; + Output output; + Input input = null; + try { + bos = new ByteArrayOutputStream(); + output = new Output(bos); + kryo.writeObject(output, src); + output.close(); + input = new Input(bos.toByteArray()); + return (SRC)kryo.readObject(input, src.getClass()); + } finally { + IOUtils.closeQuietly(input); + IOUtils.closeQuietly(bos); + } + } + + @Override + public Collection>> definePartitions( + Collection>> partitions, PartitioningContext context) + { + + final int prevCount = partitions.size(); + if (1 < prevCount) { // dynamic repartition not supported in this example + throw new RuntimeException("Error: Dynamic repartition not supported"); + } + + // compute first and last indices of partitions for each directory + final int numDirs = directories.length; + final int numParCounts = partitionCounts.length; + final int[] sliceFirstIndex = new int[numDirs]; + + LOG.info("definePartitions: prevCount = {}, directories.length = {}, " + + "partitionCounts.length = {}", prevCount, numDirs, numParCounts); + + int nPartitions = 0; // desired number of partitions + + for (int i = 0; i < numDirs; ++i) { + sliceFirstIndex[i] = nPartitions; + final int nP = partitionCounts[i]; + LOG.info("definePartitions: i = {}, nP = {}, dir = {}", i, nP, directories[i]); + nPartitions += nP; + } + + + if (1 == nPartitions) { + LOG.info("definePartitions: Nothing to do in definePartitions"); + return partitions; // nothing to do + } + + if (nPartitions <= 0) { // error + final String msg = String.format("Error: Bad number of partitions %d%n", nPartitions); + LOG.error(msg); + throw new RuntimeException(msg); + } + this.partitionCount = nPartitions; + + LOG.debug("definePartitions: Creating {} partitions", nPartitions); + + AbstractFileInputOperator tempOperator = partitions.iterator().next().getPartitionedInstance(); + + /* + * Create partitions of scanners, scanner's partition method will do state + * transfer for DirectoryScanner objects. + */ + Kryo kryo = new Kryo(); + + SlicedDirectoryScanner sds = (SlicedDirectoryScanner)scanner; + List scanners = sds.partition(nPartitions, directories, + partitionCounts); + + // return value: new list of partitions (includes old list) + List>> newPartitions + // = Lists.newArrayListWithExpectedSize(totalCount); + = new ArrayList(nPartitions); + + // parallel list of storage managers + Collection newManagers + // = Lists.newArrayListWithExpectedSize(totalCount); + = new ArrayList(nPartitions); + + // setup new partitions + LOG.info("definePartitions: setting up {} new partitoins with {} monitored directories", + nPartitions, numDirs); + + final WindowDataManager ism = getWindowDataManager(); + + // j is the index of current slice + int idx = 0; // index of partition + for (int j = 0; j < numDirs; ++j) { + int first = sliceFirstIndex[j]; + int last = first + partitionCounts[j]; + String dir = directories[j]; + LOG.info("definePartitions: first = {}, last = {}, dir = {}", first, last, dir); + for (int i = first; i < last; ++i) { + AbstractFileInputOperator oper = cloneObject(kryo, this); + oper.setDirectory(dir); + SlicedDirectoryScanner scn = (SlicedDirectoryScanner)scanners.get(i); + scn.setStartIndex(first); + scn.setEndIndex(last); + scn.setDirectory(dir); + + oper.setScanner(scn); + newPartitions.add(new DefaultPartition<>(oper)); + newManagers.add(oper.getWindowDataManager()); + } + } + + //ism.partitioned(newManagers, null); + LOG.info("definePartition: returning {} partitions", newPartitions.size()); + return newPartitions; + } + + /** + * A directory scanner where each instance monitors a different directory + */ + public static class SlicedDirectoryScanner extends AbstractFileInputOperator.DirectoryScanner + { + // Each slice of partitions monitors a single directory. + // startIndex -- of partition slice + // endIndex -- of partition slice (1 beyond last index of slice) + // pIndex -- index of this partition (we don't use the partitionIndex field in + // DirectoryScanner since it is private and has no accessors defined; we also use + // it differently here. + // + int startIndex; + int endIndex; + int pIndex; + + // the monitored directory + String directory; + + // helper method to create the collection of new partitions + // total -- the total number of desired partitions; must equal sum of pCounts + // dirs -- list of monitored directories, one per slice + // pCounts -- number of partitions in each slice + // + private List partition(int total, String[] dirs, int[] pCounts) + { + ArrayList partitions = new ArrayList(total); + for (int i = 0; i < total; i++) { + final SlicedDirectoryScanner that = new SlicedDirectoryScanner(); + that.setFilePatternRegexp(getFilePatternRegexp()); + that.setpIndex(i); + // startIndex, endIndex and directory set later in definePartitions + partitions.add(that); + } + return partitions; + } + + @Override + protected boolean acceptFile(String filePathStr) + { + LOG.debug("startIndex = {}, endIndex = {}", startIndex, endIndex); + + int sliceSize = endIndex - startIndex; + + // check if file should be processed by this partition + if (sliceSize > 1) { + // find the partition to receive this file + final int i = filePathStr.hashCode(); + int mod = i % sliceSize; + if (mod < 0) { + mod += sliceSize; + } + LOG.debug("partitionIndex = {}, startIndex = {}, endIndex = {}, sliceSize = {}, " + + "filePathStr = {}, hashcode = {}, mod = {}", + pIndex, startIndex, endIndex, sliceSize, filePathStr, i, mod); + + if ((startIndex + mod) != pIndex) { + return false; + } + } + + // check if file matches regex filter + Pattern regex = this.getRegex(); + if (regex != null) { + Matcher matcher = regex.matcher(filePathStr); + if (!matcher.matches()) { + return false; + } + } + return true; + } // acceptFile + + // getters and setters + public String getDirectory() + { + return directory; + } + + public void setDirectory(String v) + { + directory = v; + } + + public int getStartIndex() + { + return startIndex; + } + + public void setStartIndex(int v) + { + startIndex = v; + } + + public int getEndIndex() + { + return endIndex; + } + + public void setEndIndex(int v) + { + endIndex = v; + } + + public int getpIndex() + { + return pIndex; + } + + public void setpIndex(int v) + { + pIndex = v; + } + + } // SlicedDirectoryScanner + + private static final Logger LOG = LoggerFactory.getLogger(AbstractFileInputOperator.class); +} diff --git a/examples/fileIO/src/main/java/org/apache/apex/examples/fileIOMultiDir/FileWriter.java b/examples/fileIO/src/main/java/org/apache/apex/examples/fileIOMultiDir/FileWriter.java new file mode 100644 index 0000000000..0e79cbc095 --- /dev/null +++ b/examples/fileIO/src/main/java/org/apache/apex/examples/fileIOMultiDir/FileWriter.java @@ -0,0 +1,154 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.apex.examples.fileIOMultiDir; + +import java.util.ArrayList; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.lib.io.fs.AbstractFileOutputOperator; + +/** + * Write incoming line to output file + */ +public class FileWriter extends AbstractFileOutputOperator +{ + private static final Logger LOG = LoggerFactory.getLogger(FileWriter.class); + private static final String CHARSET_NAME = "UTF-8"; + private static final String NL = System.lineSeparator(); + private static final char START_FILE = FileReader.START_FILE; + private static final char FINISH_FILE = FileReader.FINISH_FILE; + + private String fileName; // current file name + + private boolean eof; + + // lines that arrive before the start control tuple are saved here + private ArrayList savedLines = new ArrayList<>(); + + /** + * control port for file start/finish control tuples + */ + public final transient DefaultInputPort control = new DefaultInputPort() + { + @Override + public void process(String tuple) + { + processControlTuple(tuple); + } + }; + + private void processControlTuple(final String tuple) + { + if (START_FILE == tuple.charAt(0)) { // start of file + LOG.debug("start tuple = {}", tuple); + + // sanity check + if (null != fileName) { + throw new RuntimeException(String.format("Error: fileName = %s, expected null", fileName)); + } + + fileName = tuple.substring(1); + + // if we have saved lines, process them + if (!savedLines.isEmpty()) { + LOG.debug("Processing {} saved lines", savedLines.size()); + for (String line : savedLines) { + processTuple(line); + } + savedLines.clear(); + } + + return; + } + + final int last = tuple.length() - 1; + if (FINISH_FILE == tuple.charAt(last)) { // end of file + LOG.debug("finish tuple = {}", tuple); + String name = tuple.substring(0, last); + + // sanity check : should match what we got with start control tuple + if (null == fileName || !fileName.equals(name)) { + throw new RuntimeException(String.format("Error: fileName = %s != %s = tuple", fileName, tuple)); + } + + eof = true; + return; + } + + // should never happen + throw new RuntimeException("Error: Bad control tuple: {}" + tuple); + } + + @Override + public void processTuple(String tuple) + { + if (null == fileName) { + savedLines.add(tuple); + return; + } + + super.processTuple(tuple); + } + + @Override + public void endWindow() + { + if (!eof) { + return; + } + + // got an EOF, so must have a file name + if (null == fileName) { + throw new RuntimeException("Error: fileName empty"); + } + + LOG.info("requesting finalize of {}", fileName); + requestFinalize(fileName); + super.endWindow(); + + eof = false; + fileName = null; + } + + @Override + protected String getFileName(String tuple) + { + return fileName; + } + + @Override + protected byte[] getBytesForTuple(String line) + { + LOG.debug("getBytesForTuple: line.length = {}", line.length()); + + byte[] result = null; + try { + result = (line + NL).getBytes(CHARSET_NAME); + } catch (Exception e) { + LOG.info("Error: got exception {}", e); + throw new RuntimeException(e); + } + return result; + } + +} diff --git a/examples/fileIO/src/main/java/org/apache/apex/examples/fileIOSimple/Application.java b/examples/fileIO/src/main/java/org/apache/apex/examples/fileIOSimple/Application.java new file mode 100644 index 0000000000..550186e665 --- /dev/null +++ b/examples/fileIO/src/main/java/org/apache/apex/examples/fileIOSimple/Application.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.apex.examples.fileIOSimple; + +import org.apache.apex.malhar.lib.fs.LineByLineFileInputOperator; +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.DAG; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.annotation.ApplicationAnnotation; + +/** + * Simple application illustrating file input-output + */ +@ApplicationAnnotation(name = "SimpleFileIO") +public class Application implements StreamingApplication +{ + + @Override + public void populateDAG(DAG dag, Configuration conf) + { + // create operators + LineByLineFileInputOperator in = dag.addOperator("input", + new LineByLineFileInputOperator()); + FileOutputOperator out = dag.addOperator("output", + new FileOutputOperator()); + // configure operators + in.setDirectory("/tmp/SimpleFileIO/input-dir"); + out.setFilePath("/tmp/SimpleFileIO/output-dir"); + out.setMaxLength(1_000_000); // file rotation size + + // create streams + dag.addStream("data", in.output, out.input); + } +} diff --git a/examples/fileIO/src/main/java/org/apache/apex/examples/fileIOSimple/FileOutputOperator.java b/examples/fileIO/src/main/java/org/apache/apex/examples/fileIOSimple/FileOutputOperator.java new file mode 100644 index 0000000000..e11921239c --- /dev/null +++ b/examples/fileIO/src/main/java/org/apache/apex/examples/fileIOSimple/FileOutputOperator.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.apex.examples.fileIOSimple; + +import javax.validation.constraints.NotNull; + +import com.datatorrent.api.Context; +import com.datatorrent.lib.io.fs.AbstractFileOutputOperator; + +/** + * Write incoming lines to output file + */ +public class FileOutputOperator extends AbstractFileOutputOperator +{ + private static final String CHARSET_NAME = "UTF-8"; + private static final String NL = System.lineSeparator(); + + @NotNull + private String fileName; + + private transient String fName; // per partition file name + + @Override + public void setup(Context.OperatorContext context) + { + // create file name for this partition by appending the operator id to + // the base name + // + long id = context.getId(); + fName = fileName + "_p" + id; + super.setup(context); + } + + @Override + protected String getFileName(String tuple) + { + return fName; + } + + @Override + protected byte[] getBytesForTuple(String line) + { + byte[] result = null; + try { + result = (line + NL).getBytes(CHARSET_NAME); + } catch (Exception e) { + throw new RuntimeException(e); + } + return result; + } + + // getters and setters + public String getFileName() + { + return fileName; + } + + public void setFileName(String v) + { + fileName = v; + } +} diff --git a/examples/fileIO/src/main/java/org/apache/apex/examples/fileOutput/Application.java b/examples/fileIO/src/main/java/org/apache/apex/examples/fileOutput/Application.java new file mode 100644 index 0000000000..4cdba22a66 --- /dev/null +++ b/examples/fileIO/src/main/java/org/apache/apex/examples/fileOutput/Application.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.apex.examples.fileOutput; + +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.DAG; +import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.annotation.ApplicationAnnotation; + +@ApplicationAnnotation(name = "fileOutput") +public class Application implements StreamingApplication +{ + + @Override + public void populateDAG(DAG dag, Configuration conf) + { + + SequenceGenerator generator = dag.addOperator("generator", SequenceGenerator.class); + + FileWriter writer = dag.addOperator("writer", FileWriter.class); + + // properties can be set here or from properties file + //writer.setMaxLength(1 << 10); + + dag.addStream("data", generator.out, writer.input); + } +} diff --git a/examples/fileIO/src/main/java/org/apache/apex/examples/fileOutput/FileWriter.java b/examples/fileIO/src/main/java/org/apache/apex/examples/fileOutput/FileWriter.java new file mode 100644 index 0000000000..1ec6cfb44a --- /dev/null +++ b/examples/fileIO/src/main/java/org/apache/apex/examples/fileOutput/FileWriter.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.apex.examples.fileOutput; + +import java.util.Arrays; + +import javax.validation.constraints.NotNull; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.datatorrent.api.Context; +import com.datatorrent.lib.io.fs.AbstractFileOutputOperator; + +/** + * Write incoming line to output file + */ +public class FileWriter extends AbstractFileOutputOperator +{ + private static final Logger LOG = LoggerFactory.getLogger(FileWriter.class); + private static final String CHARSET_NAME = "UTF-8"; + private static final String NL = System.lineSeparator(); + + @NotNull + private String fileName; // current base file name + + private transient String fName; // per partition file name + + @Override + public void setup(Context.OperatorContext context) + { + // create file name for this partition by appending the operator id to + // the base name + // + long id = context.getId(); + fName = fileName + "_p" + id; + super.setup(context); + + LOG.debug("Leaving setup, fName = {}, id = {}", fName, id); + } + + @Override + protected String getFileName(Long[] tuple) + { + return fName; + } + + @Override + protected byte[] getBytesForTuple(Long[] pair) + { + byte[] result = null; + try { + result = (Arrays.toString(pair) + NL).getBytes(CHARSET_NAME); + } catch (Exception e) { + LOG.info("Error: got exception {}", e); + throw new RuntimeException(e); + } + return result; + } + + // getters and setters + public String getFileName() + { + return fileName; + } + + public void setFileName(String v) + { + fileName = v; + } +} diff --git a/examples/fileIO/src/main/java/org/apache/apex/examples/fileOutput/SequenceGenerator.java b/examples/fileIO/src/main/java/org/apache/apex/examples/fileOutput/SequenceGenerator.java new file mode 100644 index 0000000000..34fcd3f80b --- /dev/null +++ b/examples/fileIO/src/main/java/org/apache/apex/examples/fileOutput/SequenceGenerator.java @@ -0,0 +1,124 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.apex.examples.fileOutput; + +import javax.validation.constraints.Min; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.InputOperator; +import com.datatorrent.common.util.BaseOperator; + +/** + * Simple operator that emits pairs of integers where the first value is the + * operator id and the second forms elements of an arithmetic progression whose + * increment is 'divisor' (can be changed dynamically). + */ +public class SequenceGenerator extends BaseOperator implements InputOperator +{ + private static final Logger LOG = LoggerFactory.getLogger(SequenceGenerator.class); + + // properties + + @Min(1) + private int maxTuples = 5; // max number of tuples per window + @Min(1) + private long divisor = 1; // only values divisible by divisor are output + + private int sleepTime; + + private long nextValue; // next value to emit + + // transient fields + + private transient int numTuples = 0; // number emitted in current window + private transient long id; // operator id + + public final transient DefaultOutputPort out = new DefaultOutputPort<>(); + + @Override + public void setup(OperatorContext context) + { + super.setup(context); + + id = context.getId(); + sleepTime = context.getValue(OperatorContext.SPIN_MILLIS); + LOG.debug("Leaving setup, id = {}, sleepTime = {}, divisor = {}", + id, sleepTime, divisor); + } + + @Override + public void beginWindow(long windowId) + { + numTuples = 0; + super.beginWindow(windowId); + } + + @Override + public void emitTuples() + { + if (numTuples < maxTuples) { + // nextValue will normally be divisible by divisor but the divisor can be changed + // externally (e.g. after a repartition) so find next value that is divisible by + // divisor + // + final long rem = nextValue % divisor; + if (0 != rem) { + nextValue += (divisor - rem); + } + ++numTuples; + out.emit(new Long[]{id, nextValue}); + nextValue += divisor; + } else { + + try { + // avoid repeated calls to this function + Thread.sleep(sleepTime); + } catch (InterruptedException e) { + LOG.info("Sleep interrupted"); + } + } + } + + // getters and setters + + public long getDivisor() + { + return divisor; + } + + public void setDivisor(long v) + { + divisor = v; + } + + public int getMaxTuples() + { + return maxTuples; + } + + public void setMaxTuples(int v) + { + maxTuples = v; + } +} diff --git a/examples/fileIO/src/main/resources/META-INF/properties-FileIO.xml b/examples/fileIO/src/main/resources/META-INF/properties-FileIO.xml new file mode 100644 index 0000000000..3df70043b0 --- /dev/null +++ b/examples/fileIO/src/main/resources/META-INF/properties-FileIO.xml @@ -0,0 +1,52 @@ + + + + + + + dt.application.FileIO.operator.read.prop.directory + /tmp/fileIO/input-dir + + + dt.application.FileIO.operator.write.prop.filePath + /tmp/fileIO/output-dir + + + dt.application.FileIO.operator.read.prop.partitionCount + 5 + + + dt.loggers.level + com.datatorrent.*:INFO,org.apache.*:INFO + + + diff --git a/examples/fileIO/src/main/resources/META-INF/properties-ThroughputBasedFileIO.xml b/examples/fileIO/src/main/resources/META-INF/properties-ThroughputBasedFileIO.xml new file mode 100644 index 0000000000..da98a34d4e --- /dev/null +++ b/examples/fileIO/src/main/resources/META-INF/properties-ThroughputBasedFileIO.xml @@ -0,0 +1,103 @@ + + + + + + + + + dt.operator.read.prop.directory + /tmp/fileIO/input-dir + + + + dt.operator.read.prop.pendingFilesPerOperator + 2 + + + + dt.operator.read.prop.partitionCount + 2 + + + + dt.operator.read.prop.blockSize + 128 + + + + dt.operator.read.prop.repartitionInterval + 30000 + + + + + dt.operator.read.prop.blockThreshold + 2 + + + + dt.operator.read.prop.emitBatchSize + 2 + + + + dt.operator.read.attr.MEMORY_MB + 1024 + + + + dt.operator.write.attr.MEMORY_MB + 1024 + + + + dt.operator.*.port.*.attr.BUFFER_MEMORY_MB + 128 + + + + dt.operator.write.prop.filePath + /tmp/fileIO/output-dir + + + + dt.operator.write.prop.rotationWindows + 60 + + + + dt.loggers.level + com.datatorrent.*:DEBUG,org.apache.*:INFO + + + diff --git a/examples/fileIO/src/main/resources/META-INF/properties-fileIO-multiDir.xml b/examples/fileIO/src/main/resources/META-INF/properties-fileIO-multiDir.xml new file mode 100644 index 0000000000..4dfb8d655a --- /dev/null +++ b/examples/fileIO/src/main/resources/META-INF/properties-fileIO-multiDir.xml @@ -0,0 +1,87 @@ + + + + + + + + + dt.application.FileIO.operator.read.prop.directory + /tmp/fileIO/dummy + + + + dt.application.FileIO.operator.read.prop.directories + "/tmp/fileIO/in1","/tmp/fileIO/in2" + + + + dt.application.FileIO.operator.read.prop.partitionCounts + 2,3 + + + + dt.application.FileIO.operator.read.attr.MEMORY_MB + 512 + + + + dt.application.FileIO.operator.write.attr.MEMORY_MB + 512 + + + + dt.application.FileIO.operator.*.port.*.attr.BUFFER_MEMORY_MB + 128 + + + + dt.application.FileIO.stream.data.locality + CONTAINER_LOCAL + + + + dt.application.FileIO.stream.control.locality + CONTAINER_LOCAL + + + + dt.application.FileIO.operator.write.prop.filePath + /tmp/fileIO/output-dir + + + + dt.loggers.level + com.datatorrent.*:INFO,org.apache.*:INFO + + + diff --git a/examples/fileIO/src/main/resources/META-INF/properties-fileIOSimple.xml b/examples/fileIO/src/main/resources/META-INF/properties-fileIOSimple.xml new file mode 100644 index 0000000000..caead28950 --- /dev/null +++ b/examples/fileIO/src/main/resources/META-INF/properties-fileIOSimple.xml @@ -0,0 +1,54 @@ + + + + + + + + + dt.application.SimpleFileIO.operator.input.prop.directory + /tmp/SimpleFileIO/input-dir + + + dt.application.SimpleFileIO.operator.output.prop.filePath + /tmp/SimpleFileIO/output-dir + + + dt.application.SimpleFileIO.operator.output.prop.fileName + myfile + + + dt.application.SimpleFileIO.operator.output.prop.maxLength + 1000000 + + + diff --git a/examples/fileIO/src/main/resources/META-INF/properties-fileOutput.xml b/examples/fileIO/src/main/resources/META-INF/properties-fileOutput.xml new file mode 100644 index 0000000000..c81c5ee831 --- /dev/null +++ b/examples/fileIO/src/main/resources/META-INF/properties-fileOutput.xml @@ -0,0 +1,74 @@ + + + + + + + dt.application.*.operator.*.attr.MEMORY_MB + 500 + + + + dt.application.fileOutput.operator.generator.prop.maxTuples + 5 + + + dt.application.fileOutput.operator.generator.prop.divisor + 5 + + + dt.application.fileOutput.operator.generator.attr.PARTITIONER + com.datatorrent.common.partitioner.StatelessPartitioner:2 + + + + dt.application.fileOutput.operator.writer.prop.maxLength + 1024 + + + dt.application.fileOutput.operator.writer.prop.filePath + /tmp/fileOutput + + + dt.application.fileOutput.operator.writer.prop.fileName + sequence + + + dt.application.fileOutput.operator.writer.attr.PARTITIONER + com.datatorrent.common.partitioner.StatelessPartitioner:3 + + + dt.application.fileOutput.stream.data.locality + CONTAINER_LOCAL + + + diff --git a/examples/fileIO/src/main/resources/unused-log4j.properties b/examples/fileIO/src/main/resources/unused-log4j.properties new file mode 100644 index 0000000000..625497b1c8 --- /dev/null +++ b/examples/fileIO/src/main/resources/unused-log4j.properties @@ -0,0 +1,28 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +log4j.rootLogger=DEBUG,CONSOLE + +log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender +log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout +log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n + +log4j.logger.org=info +#log4j.logger.org.apache.commons.beanutils=warn +log4j.logger.com.datatorrent=info diff --git a/examples/fileIO/src/test/java/org/apache/apex/examples/fileIO/ApplicationTest.java b/examples/fileIO/src/test/java/org/apache/apex/examples/fileIO/ApplicationTest.java new file mode 100644 index 0000000000..ed3a5448c7 --- /dev/null +++ b/examples/fileIO/src/test/java/org/apache/apex/examples/fileIO/ApplicationTest.java @@ -0,0 +1,136 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.apex.examples.fileIO; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; + +import javax.validation.ConstraintViolationException; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.LocalMode; + + +/** + * Test application in local mode. + */ +public class ApplicationTest +{ + + private static final String baseDirName = "target/fileIO"; + private static final String inputDirName = baseDirName + "input-dir"; + private static final String outputDirName = baseDirName + "output-dir"; + private static final File inputDirFile = new File(inputDirName); + private static final File outputDirFile = new File(outputDirName); + + private static final int numFiles = 10; // number of input files + private static final int numLines = 10; // number of lines in each input file + private static final int numPartitions = 3; // number of partitions of input operator + + // create nFiles files with nLines lines in each + private void createFiles(final int nFiles, final int nLines) throws IOException + { + for (int file = 0; file < nFiles; file++) { + ArrayList lines = new ArrayList<>(); + for (int line = 0; line < nLines; line++) { + lines.add("file " + file + ", line " + line); + } + try { + FileUtils.write(new File(inputDirFile, "file" + file), StringUtils.join(lines, "\n")); + } catch (IOException e) { + System.out.format("Error: Failed to create file %s%n", file); + e.printStackTrace(); + } + } + System.out.format("Created %d files with %d lines in each%n", nFiles, nLines); + } + + private void cleanup() + { + try { + FileUtils.deleteDirectory(inputDirFile); + FileUtils.deleteDirectory(outputDirFile); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + // check that the requisite number of files exist in the output directory + private boolean check(final int nFiles) + { + return nFiles == FileUtils.listFiles(outputDirFile, null, false).size(); + } + + // return Configuration with suitable properties set + private Configuration getConfig() + { + final Configuration result = new Configuration(false); + //conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml")); + result.set("dt.application.FileIO.operator.read.prop.directory", inputDirName); + result.setInt("dt.application.FileIO.operator.read.prop.partitionCount", numPartitions); + result.set("dt.application.FileIO.operator.write.prop.filePath", outputDirName); + return result; + } + + @Before + public void beforeTest() throws Exception + { + cleanup(); + FileUtils.forceMkdir(inputDirFile); + FileUtils.forceMkdir(outputDirFile); + + // create some text files in input directory + createFiles(numFiles, numLines); + } + + @After + public void afterTest() + { + cleanup(); + } + + @Test + public void testApplication() throws Exception + { + try { + LocalMode lma = LocalMode.newInstance(); + lma.prepareDAG(new Application(), getConfig()); + LocalMode.Controller lc = lma.getController(); + lc.runAsync(); + + // wait for output files to show up + while (!check(numFiles) ) { + Thread.sleep(1000); + } + } catch (ConstraintViolationException e) { + Assert.fail("constraint violations: " + e.getConstraintViolations()); + } + } + +} diff --git a/examples/fileIO/src/test/java/org/apache/apex/examples/fileIO/ThroughputBasedApplicationTest.java b/examples/fileIO/src/test/java/org/apache/apex/examples/fileIO/ThroughputBasedApplicationTest.java new file mode 100644 index 0000000000..2a33257db3 --- /dev/null +++ b/examples/fileIO/src/test/java/org/apache/apex/examples/fileIO/ThroughputBasedApplicationTest.java @@ -0,0 +1,151 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.apex.examples.fileIO; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; + +import javax.validation.ConstraintViolationException; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.LocalMode; + +public class ThroughputBasedApplicationTest +{ + private static final String baseDirName = "target/fileIO"; + private static final String inputDirName = baseDirName + "input-dir"; + private static final String outputDirName = baseDirName + "output-dir"; + private static final File inputDirFile = new File(inputDirName); + private static final File outputDirFile = new File(outputDirName); + + private static final int numFiles = 10; // number of input files + private static final int numLines = 10; // number of lines in each input file + private static final int numPartitions = 4; // number of partitions of input operator + + // create nFiles files with nLines lines in each + private void createFiles(final int nFiles, final int nLines) throws IOException + { + for (int file = 0; file < nFiles; file++) { + ArrayList lines = new ArrayList<>(); + for (int line = 0; line < nLines; line++) { + lines.add("file " + file + ", line " + line); + } + try { + FileUtils.write(new File(inputDirFile, "file" + file), StringUtils.join(lines, "\n")); + } catch (IOException e) { + System.out.format("Error: Failed to create file %s%n", file); + e.printStackTrace(); + } + } + System.out.format("Created %d files with %d lines in each%n", nFiles, nLines); + } + + private void cleanup() + { + try { + FileUtils.deleteDirectory(inputDirFile); + FileUtils.deleteDirectory(outputDirFile); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + // check that the requisite number of files exist in the output directory + private boolean checkFilesCopied(final int nFiles) + { + return (nFiles == FileUtils.listFiles(outputDirFile, null, false).size() && verifyFilesRenamed()); + } + + private boolean verifyFilesRenamed() + { + for (int i = 0; i < numFiles; i++) { + String fileName = "file" + i; + if (!new File(outputDirFile, fileName).exists()) { + return false; + } + } + return true; + } + + private Configuration getConfig() + { + final Configuration result = new Configuration(false); + result.setInt("dt.application.ThroughputBasedFileIO.attr.CHECKPOINT_WINDOW_COUNT", 10); + result.set("dt.application.ThroughputBasedFileIO.operator.read.prop.directory", inputDirName); + result.setInt("dt.application.ThroughputBasedFileIO.operator.read.prop.pendingFilesPerOperator", 2); + result.setInt("dt.application.ThroughputBasedFileIO.operator.read.prop.partitionCount", numPartitions); + result.set("dt.application.ThroughputBasedFileIO.operator.write.prop.filePath", outputDirName); + return result; + } + + @Before + public void beforeTest() throws Exception + { + cleanup(); + FileUtils.forceMkdir(inputDirFile); + FileUtils.forceMkdir(outputDirFile); + + // create some text files in input directory + createFiles(numFiles, numLines); + } + + @After + public void afterTest() + { + cleanup(); + } + + @Test + public void testApplication() throws Exception + { + try { + LocalMode lma = LocalMode.newInstance(); + lma.prepareDAG(new ThroughputBasedApplication(), getConfig()); + LocalMode.Controller lc = lma.getController(); + lc.runAsync(); + + // wait for output files to show up + while (!checkFilesCopied(numFiles)) { + Thread.sleep(1000); + } + lc.shutdown(); + verifyCopiedData(); + } catch (ConstraintViolationException e) { + Assert.fail("constraint violations: " + e.getConstraintViolations()); + } + } + + private void verifyCopiedData() throws IOException + { + for (int i = 0; i < numFiles; i++) { + String fileName = "file" + i; + Assert.assertTrue(FileUtils.contentEquals(new File(inputDirFile, fileName), new File(outputDirName, fileName))); + } + } +} diff --git a/examples/fileIO/src/test/java/org/apache/apex/examples/fileIOMultiDir/ApplicationTest.java b/examples/fileIO/src/test/java/org/apache/apex/examples/fileIOMultiDir/ApplicationTest.java new file mode 100644 index 0000000000..1244d32910 --- /dev/null +++ b/examples/fileIO/src/test/java/org/apache/apex/examples/fileIOMultiDir/ApplicationTest.java @@ -0,0 +1,158 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.apex.examples.fileIOMultiDir; + +import java.io.File; +import java.io.IOException; +import java.util.Collection; + +import javax.validation.ConstraintViolationException; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.LocalMode; + + +/** + * Test application in local mode. + */ +public class ApplicationTest +{ + + private static final String baseDirName = "target/fileIO"; + private static final String inputDirName1 = baseDirName + "/in1"; + private static final String inputDirName2 = baseDirName + "/in2"; + private static final String outputDirName = baseDirName + "/output-dir"; + private static final File inputDirFile1 = new File(inputDirName1); + private static final File inputDirFile2 = new File(inputDirName2); + private static final File outputDirFile = new File(outputDirName); + + private static final int numFiles = 10; // number of input files + private static final int numLines = 10; // number of lines in each input file + private static final int numPartitions = 3; // number of partitions of input operator + + // create nFiles files with nLines lines in each + private void createFiles() throws IOException + { + + String[] lines1 = {"line-1", "line-2"}; + String[] lines2 = {"line-3", "line-4"}; + + try { + FileUtils.write(new File(inputDirFile1, "file1.txt"), + StringUtils.join(lines1, "\n")); + FileUtils.write(new File(inputDirFile2, "file2.txt"), + StringUtils.join(lines2, "\n")); + } catch (IOException e) { + System.out.format("Error: Failed to create file %s%n"); + e.printStackTrace(); + } + System.out.format("Created 2 files with 2 lines in each%n"); + } + + private void cleanup() + { + try { + FileUtils.deleteDirectory(inputDirFile1); + FileUtils.deleteDirectory(inputDirFile2); + FileUtils.deleteDirectory(outputDirFile); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + // check that the requisite number of files exist in the output directory + private boolean check(final int nFiles) + { + String[] ext = {"txt"}; + Collection list = FileUtils.listFiles(outputDirFile, ext, false); + + if (list.size() < 2) { + return false; + } + + // we have 2 files; check that the rename from .tmp to .txt has happened + int found = 0; + for (File f : list) { + String name = f.getName(); + if (name.equals("file1.txt") || name.equals("file2.txt")) { + ++found; + } + } + return 2 == found; + } + + // return Configuration with suitable properties set + private Configuration getConfig() + { + final Configuration result = new Configuration(false); + //conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties-fileIOMultiDir.xml")); + result.set("dt.application.FileIO.operator.read.prop.directory", "dummy"); + result.set("dt.application.FileIO.operator.read.prop.directories", + "\"target/fileIO/in1\",\"target/fileIO/in2\""); + result.set("dt.application.FileIO.operator.read.prop.partitionCounts", "2,3"); + result.set("dt.application.FileIO.operator.write.prop.filePath", outputDirName); + return result; + } + + @Before + public void beforeTest() throws Exception + { + cleanup(); + FileUtils.forceMkdir(inputDirFile1); + FileUtils.forceMkdir(inputDirFile2); + FileUtils.forceMkdir(outputDirFile); + + // create some text files in input directory + createFiles(); + } + + @After + public void afterTest() + { + cleanup(); + } + + @Test + public void testApplication() throws Exception + { + try { + LocalMode lma = LocalMode.newInstance(); + lma.prepareDAG(new Application(), getConfig()); + LocalMode.Controller lc = lma.getController(); + lc.runAsync(); + + // wait for output files to show up + while (!check(numFiles)) { + Thread.sleep(1000); + } + } catch (ConstraintViolationException e) { + Assert.fail("constraint violations: " + e.getConstraintViolations()); + } + } + +} diff --git a/examples/fileIO/src/test/java/org/apache/apex/examples/fileIOSimple/ApplicationTest.java b/examples/fileIO/src/test/java/org/apache/apex/examples/fileIOSimple/ApplicationTest.java new file mode 100644 index 0000000000..fff96065ce --- /dev/null +++ b/examples/fileIO/src/test/java/org/apache/apex/examples/fileIOSimple/ApplicationTest.java @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.apex.examples.fileIOSimple; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; + +import javax.validation.ConstraintViolationException; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.LocalMode; + + +/** + * Test application in local mode. + */ +public class ApplicationTest +{ + + private static final String baseDirName = "target/SimpleFileIO"; + private static final String inputDirName = baseDirName + "/input-dir"; + private static final String outputDirName = baseDirName + "/output-dir"; + private static final File inputDirFile = new File(inputDirName); + private static final File outputDirFile = new File(outputDirName); + + private static final int numFiles = 10; // number of input files + private static final int numLines = 10; // number of lines in each input file + private static final int numPartitions = 3; // number of partitions of input operator + + // create nFiles files with nLines lines in each + private void createFiles(final int nFiles, final int nLines) throws IOException + { + for (int file = 0; file < nFiles; file++) { + ArrayList lines = new ArrayList<>(); + for (int line = 0; line < nLines; line++) { + lines.add("file " + file + ", line " + line); + } + try { + FileUtils.write(new File(inputDirFile, "file" + file), StringUtils.join(lines, "\n")); + } catch (IOException e) { + System.out.format("Error: Failed to create file %s%n", file); + e.printStackTrace(); + } + } + System.out.format("Created %d files with %d lines in each%n", nFiles, nLines); + } + + private void cleanup() + { + try { + FileUtils.deleteDirectory(outputDirFile); + FileUtils.deleteDirectory(outputDirFile); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + // check that the requisite number of files exist in the output directory + private boolean check() + { + // Look for files with a single digit extension + String[] ext = {"0","1","2","3","4","5","6","7","8","9"}; + Collection list = FileUtils.listFiles(outputDirFile, ext, false); + + return !list.isEmpty(); + } + + // return Configuration with suitable properties set + private Configuration getConfig() + { + final Configuration result = new Configuration(false); + //conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties-fileIOSimple.xml")); + result.set("dt.application.SimpleFileIO.operator.input.prop.directory", inputDirName); + result.set("dt.application.SimpleFileIO.operator.output.prop.filePath", outputDirName); + result.set("dt.application.SimpleFileIO.operator.output.prop.fileName", "myfile"); + result.setInt("dt.application.SimpleFileIO.operator.output.prop.maxLength", 1000); + return result; + } + + @Before + public void beforeTest() throws Exception + { + cleanup(); + FileUtils.forceMkdir(inputDirFile); + FileUtils.forceMkdir(outputDirFile); + + // create some text files in input directory + createFiles(numFiles, numLines); + } + + @After + public void afterTest() + { + cleanup(); + } + + @Test + public void testApplication() throws Exception + { + try { + LocalMode lma = LocalMode.newInstance(); + lma.prepareDAG(new Application(), getConfig()); + LocalMode.Controller lc = lma.getController(); + lc.runAsync(); + + // wait for output files to show up + while (!check()) { + Thread.sleep(1000); + } + } catch (ConstraintViolationException e) { + Assert.fail("constraint violations: " + e.getConstraintViolations()); + } + } + +} diff --git a/examples/fileIO/src/test/java/org/apache/apex/examples/fileOutput/ApplicationTest.java b/examples/fileIO/src/test/java/org/apache/apex/examples/fileOutput/ApplicationTest.java new file mode 100644 index 0000000000..ad8c323e64 --- /dev/null +++ b/examples/fileIO/src/test/java/org/apache/apex/examples/fileOutput/ApplicationTest.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.apex.examples.fileOutput; + +import java.io.File; +import java.io.IOException; +import java.util.Collection; + +import javax.validation.ConstraintViolationException; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; + +import com.datatorrent.api.LocalMode; + + +/** + * Test application in local mode. + */ +public class ApplicationTest +{ + + private static final String baseDirName = "target/fileOutput"; + private static final String outputDirName = baseDirName + "/output-dir"; + private static final File outputDirFile = new File(outputDirName); + + private void cleanup() + { + try { + FileUtils.deleteDirectory(outputDirFile); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + // check that the requisite number of files exist in the output directory + private boolean check() + { + // Look for files with a single digit extension + String[] ext = {"0","1","2","3","4","5","6","7","8","9"}; + Collection list = FileUtils.listFiles(outputDirFile, ext, false); + + return !list.isEmpty(); + } + + // return Configuration with suitable properties set + private Configuration getConfig() + { + final Configuration result = new Configuration(false); + result.addResource(this.getClass().getResourceAsStream("/META-INF/properties-fileOutput.xml")); + result.setInt("dt.application.fileOutput.dt.operator.generator.prop.divisor", 3); + result.set("dt.application.fileOutput.operator.writer.prop.filePath", outputDirName); + return result; + } + + @Before + public void beforeTest() throws Exception + { + cleanup(); + FileUtils.forceMkdir(outputDirFile); + } + + @After + public void afterTest() + { + cleanup(); + } + + @Test + public void testApplication() throws Exception + { + try { + LocalMode lma = LocalMode.newInstance(); + lma.prepareDAG(new Application(), getConfig()); + LocalMode.Controller lc = lma.getController(); + lc.runAsync(); + + // wait for output files to show up + while (!check()) { + Thread.sleep(1000); + } + } catch (ConstraintViolationException e) { + Assert.fail("constraint violations: " + e.getConstraintViolations()); + } + } + +} diff --git a/examples/fileIO/src/test/resources/log4j.properties b/examples/fileIO/src/test/resources/log4j.properties new file mode 100644 index 0000000000..0a1b8cba5b --- /dev/null +++ b/examples/fileIO/src/test/resources/log4j.properties @@ -0,0 +1,42 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +log4j.rootLogger=DEBUG,CONSOLE + +log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender +log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout +log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n +log4j.appender.CONSOLE.threshold=${test.log.console.threshold} +test.log.console.threshold=WARN + +log4j.appender.RFA=org.apache.log4j.RollingFileAppender +log4j.appender.RFA.layout=org.apache.log4j.PatternLayout +log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n +log4j.appender.RFA.File=/tmp/app.log + +# to enable, add SYSLOG to rootLogger +log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender +log4j.appender.SYSLOG.syslogHost=127.0.0.1 +log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout +log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - %m%n +log4j.appender.SYSLOG.Facility=LOCAL1 + +log4j.logger.org=info +#log4j.logger.org.apache.commons.beanutils=warn +log4j.logger.com.datatorrent=debug diff --git a/examples/pom.xml b/examples/pom.xml index 180d7c96bd..69af4bb413 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -200,6 +200,7 @@ kafka ftp s3 + fileIO