Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 62 additions & 27 deletions processor/src/main/java/com/flytxt/tp/processor/FlyReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ public class FlyReader implements Callable<FlyReader> {
private LineProcessor lp;

private boolean stopRequested;

private long waitTime = 0;

private static final long MAX_WAIT_TIME = 60000;

public enum Status {
RUNNING, TERMINATED, SHUTTINGDOWN
Expand Down Expand Up @@ -58,42 +62,73 @@ public void run() {
assert Files.exists(folderP);

status = Status.RUNNING;
while (!stopRequested) {

//try (DirectoryStream<Path> directoryStream = Files.newDirectoryStream(Paths.get(folder), fileFilter)) {
try{

FileIterator<Path> directoryStream= fileFilter.iterator();
if(null!=directoryStream){
for (final Path path : directoryStream) {
try {
appLog.debug("picked up " + path.toString());
buf.clear();
String fileName = path.getFileName().toString();
lp.getMf().getCurrentObject().init(folder, fileName);
BasicFileAttributes attr = Files.readAttributes(path, BasicFileAttributes.class);
lp.init(fileName, attr.lastModifiedTime().toMillis());
processFile(path);
Files.delete(path);
lastProcessedFile = fileName;
if (stopRequested) {
lp.preDestroy();
appLog.debug("shutting down Worker @ :" + folder);
break;
try {
while (canProcess(fileFilter.iterator())) {

//try (DirectoryStream<Path> directoryStream = Files.newDirectoryStream(Paths.get(folder), fileFilter)) {
try{

FileIterator<Path> directoryStream= fileFilter.iterator();
if(null!=directoryStream){
for (final Path path : directoryStream) {
try {
appLog.debug("picked up " + path.toString());
buf.clear();
String fileName = path.getFileName().toString();
lp.getMf().getCurrentObject().init(folder, fileName);
BasicFileAttributes attr = Files.readAttributes(path, BasicFileAttributes.class);
lp.init(fileName, attr.lastModifiedTime().toMillis());
processFile(path);
Files.delete(path);
lastProcessedFile = fileName;
if (stopRequested) {
lp.preDestroy();
appLog.debug("shutting down Worker @ :" + folder);
break;
}
} catch (final OverlappingFileLockException e) {
appLog.error("Could not process " + path.toString(), e);
}
} catch (final OverlappingFileLockException e) {
appLog.error("Could not process " + path.toString(), e);
}
}
fileFilter.refresh();
} catch (final Exception ex) {
ex.printStackTrace();
}
fileFilter.refresh();
} catch (final Exception ex) {
ex.printStackTrace();
}
} catch (InterruptedException e) {
appLog.error(" Thread interrupted Going to shutdwon the Process " + e.getMessage());
}
status = Status.TERMINATED;
appLog.debug("Worker down " + lp.getSourceFolder());
}

/**
*
* @param iterator
* @return
* @throws InterruptedException
*/
private boolean canProcess(FileIterator<Path> iterator) throws InterruptedException{
if(stopRequested){
waitTime = 0;
return false;
}else{
if(iterator.hasNext()){
waitTime = 0;
return true;
}else{
try {
if(waitTime>=MAX_WAIT_TIME)
waitTime = 0;
Thread.sleep(waitTime++);
return true;
} catch (InterruptedException e) {
throw e;
}
}
}
}

private void processFile(final Path path) throws Exception {
final long t1 = System.currentTimeMillis();
Expand Down
154 changes: 154 additions & 0 deletions processor/src/test/java/com/flytxt/parser/processor/FlyReaderTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,40 @@

import java.io.File;
import java.io.IOException;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import java.util.HashMap;
import java.util.Map;

import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

import com.flytxt.tp.processor.FlyReader;
import com.flytxt.tp.processor.LineProcessor;
import com.flytxt.tp.processor.LineProcessorImpl;
import com.flytxt.tp.processor.filefilter.FilterChainBuilder;
import com.flytxt.tp.processor.filefilter.FilterParameters;
import com.flytxt.tp.processor.filefilter.FlyFileFilter;

import test.TestScript;

public class FlyReaderTest {


private static final String DATA_FILE_PATH ="src"+File.separator+"test"+File.separator+"resources"+File.separator+"test-data";

FlyReader fr = new FlyReader();
Method processFile;
java.nio.file.Path path;


@Before
public void init(){
LineProcessor lp = new TestScript();
Expand All @@ -42,12 +59,149 @@ public void init(){
@Test
public void test(){
try {


processFile.invoke(fr, path);
} catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

}

@Test
public void test_call(){
try {

createTestData();
fr.set(LineProcessorImpl.SOURCE_PTH, new LineProcessorImpl(), getFilter());
startProcess();
stopProcess();
Thread.sleep(10000);

} catch (Exception e) {
org.junit.Assert.fail(e.getMessage());
}

}

private void stopProcess() {
new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(2000);
fr.preDestroy();
Boolean stpValue = (Boolean)getFiledValue("stopRequested");
Assert.assertTrue(stpValue);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

}).start();

}


private Object getFiledValue(String filedName) {
try {
Field field = fr.getClass().getDeclaredField(filedName);
field.setAccessible(true);
return field.get(fr);
} catch (NoSuchFieldException | SecurityException | IllegalArgumentException | IllegalAccessException e) {
Assert.fail(e.getMessage());
return null;
}
}


private void startProcess() {
new Thread(new Runnable() {
@Override
public void run() {
try {
fr.call();
} catch (Exception e) {
e.printStackTrace();
}

}
}).start();
}

private void createTestData() {
File directory = new File(LineProcessorImpl.SOURCE_PTH);
if(!directory.exists()){
directory.mkdirs();
}

String sourcePath = LineProcessorImpl.SOURCE_PTH +File.separator+"test-data";
File file = new File(sourcePath);
if(!file.exists()){
try {
Files.copy(Paths.get(DATA_FILE_PATH), Paths.get(sourcePath), StandardCopyOption.REPLACE_EXISTING);
} catch (IOException e) {
org.junit.Assert.fail("Unable to copy test data");
}
}
}




/**
*
* @return
*/
public FlyFileFilter getFilter(){

FlyFileFilter fileFilter = new FlyFileFilter();
FilterChainBuilder chainbuilder = new FilterChainBuilder();
setFilterParameterMap(chainbuilder);
Map<String, String> filterValues = new HashMap<>();
filterValues.put("Filter_Name1", ""
+ "com.flytxt.tp.processor.filefilter.LastModifiedWindowFilter,"
+ "com.flytxt.tp.processor.filefilter.RegexFilter,"
+ "com.flytxt.tp.processor.filefilter.FifoFilter");
chainbuilder.setFilterNameMap(filterValues);
chainbuilder.build();

Field chainBuilderField;
try {
chainBuilderField = FlyFileFilter.class.getDeclaredField("builder");
chainBuilderField.setAccessible(true);
chainBuilderField.set(fileFilter, chainbuilder);
} catch (NoSuchFieldException | SecurityException | IllegalArgumentException | IllegalAccessException e) {
e.printStackTrace();
}
fileFilter.set(LineProcessorImpl.SOURCE_PTH,"Filter_Name1");
return fileFilter;
}

/**
* Filter Parameters
* @param chainbuilder
*/
private void setFilterParameterMap(FilterChainBuilder chainbuilder) {
try {

FilterParameters filterParameters = new FilterParameters();
Map<String, String> parameterMap = new HashMap<>();
parameterMap.put("com.flytxt.tp.processor.filefilter.LastModifiedWindowFilter", "0");
parameterMap.put("com.flytxt.tp.processor.filefilter.RegexFilter", ".*\\.txt");
Map<String,Map<String,String>> filterNamedMap = new HashMap<>();
filterNamedMap.put("Filter_Name1", parameterMap);
filterParameters.setArgMap(filterNamedMap);

Field filterParametersField = FilterChainBuilder.class.getDeclaredField("filterParameters");
filterParametersField.setAccessible(true);

filterParametersField.set(chainbuilder, filterParameters);

} catch (NoSuchFieldException | SecurityException | IllegalArgumentException | IllegalAccessException e) {
e.printStackTrace();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.io.File;

import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
Expand All @@ -26,9 +27,15 @@ public void init(){
@Test
public void testPositive(){
File[] okFiles = fifo.canProcess(files);
File preFile = null;
for(File aFile: okFiles){
if(null!=aFile)
System.out.println(aFile.getName());
if(null!=aFile){
Assert.assertNotNull(aFile.getName());
if(preFile!=null){
Assert.assertTrue(aFile.lastModified() >= preFile.lastModified());
}
preFile =aFile;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,34 +2,40 @@

import java.io.File;

import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;

public class LastModifiedWindowFilterTest{
LastModifiedWindowFilter lm = new LastModifiedWindowFilter();
File [] files = new File[10];
long currentTime = 0;

@Before
public void init(){
currentTime = System.currentTimeMillis();
for(int i =0; i < 10; i++){
File mockedFile = Mockito.mock(File.class);
Mockito.when(mockedFile.getName()).thenReturn(i+".txt");
if(i%2==0)
Mockito.when(mockedFile.lastModified()).thenReturn(System.currentTimeMillis()-500000);
Mockito.when(mockedFile.lastModified()).thenReturn(currentTime -50000);
else
Mockito.when(mockedFile.lastModified()).thenReturn(System.currentTimeMillis());
files[i] = mockedFile;
}
}

@Test
public void testPositive(){
public void testPositive(){
lm.windowUnit = 50000;
File[] okFiles = lm.canProcess(files);
for(File aFile: okFiles){
if(aFile != null)System.out.println(aFile.getName());
}
if(aFile != null){
Assert.assertNotNull(aFile.getName());
Assert.assertTrue(aFile.lastModified() >= (currentTime-50000));
}
}
}

}
Loading