From 6c993a4d70e2d2f3714d840e91911f164e1d1d85 Mon Sep 17 00:00:00 2001 From: "shiju.john" Date: Tue, 29 Nov 2016 16:32:41 +0530 Subject: [PATCH] Linear Waiting for the File Scan and updated Test cases --- .../com/flytxt/tp/processor/FlyReader.java | 89 +++++++--- .../parser/processor/FlyReaderTest.java | 154 ++++++++++++++++++ .../processor/filefilter/FifoFilterTest.java | 11 +- .../LastModifiedWindowFilterTest.java | 14 +- .../processor/filefilter/RegexFilterTest.java | 29 +++- 5 files changed, 255 insertions(+), 42 deletions(-) diff --git a/processor/src/main/java/com/flytxt/tp/processor/FlyReader.java b/processor/src/main/java/com/flytxt/tp/processor/FlyReader.java index 97c7e2b..29656b0 100644 --- a/processor/src/main/java/com/flytxt/tp/processor/FlyReader.java +++ b/processor/src/main/java/com/flytxt/tp/processor/FlyReader.java @@ -28,6 +28,10 @@ public class FlyReader implements Callable { private LineProcessor lp; private boolean stopRequested; + + private long waitTime = 0; + + private static final long MAX_WAIT_TIME = 60000; public enum Status { RUNNING, TERMINATED, SHUTTINGDOWN @@ -58,42 +62,73 @@ public void run() { assert Files.exists(folderP); status = Status.RUNNING; - while (!stopRequested) { - - //try (DirectoryStream directoryStream = Files.newDirectoryStream(Paths.get(folder), fileFilter)) { - try{ - - FileIterator directoryStream= fileFilter.iterator(); - if(null!=directoryStream){ - for (final Path path : directoryStream) { - try { - appLog.debug("picked up " + path.toString()); - buf.clear(); - String fileName = path.getFileName().toString(); - lp.getMf().getCurrentObject().init(folder, fileName); - BasicFileAttributes attr = Files.readAttributes(path, BasicFileAttributes.class); - lp.init(fileName, attr.lastModifiedTime().toMillis()); - processFile(path); - Files.delete(path); - lastProcessedFile = fileName; - if (stopRequested) { - lp.preDestroy(); - appLog.debug("shutting down Worker @ :" + folder); - break; + try { + while (canProcess(fileFilter.iterator())) { + + //try (DirectoryStream directoryStream = Files.newDirectoryStream(Paths.get(folder), fileFilter)) { + try{ + + FileIterator directoryStream= fileFilter.iterator(); + if(null!=directoryStream){ + for (final Path path : directoryStream) { + try { + appLog.debug("picked up " + path.toString()); + buf.clear(); + String fileName = path.getFileName().toString(); + lp.getMf().getCurrentObject().init(folder, fileName); + BasicFileAttributes attr = Files.readAttributes(path, BasicFileAttributes.class); + lp.init(fileName, attr.lastModifiedTime().toMillis()); + processFile(path); + Files.delete(path); + lastProcessedFile = fileName; + if (stopRequested) { + lp.preDestroy(); + appLog.debug("shutting down Worker @ :" + folder); + break; + } + } catch (final OverlappingFileLockException e) { + appLog.error("Could not process " + path.toString(), e); } - } catch (final OverlappingFileLockException e) { - appLog.error("Could not process " + path.toString(), e); } } + fileFilter.refresh(); + } catch (final Exception ex) { + ex.printStackTrace(); } - fileFilter.refresh(); - } catch (final Exception ex) { - ex.printStackTrace(); } + } catch (InterruptedException e) { + appLog.error(" Thread interrupted Going to shutdwon the Process " + e.getMessage()); } status = Status.TERMINATED; appLog.debug("Worker down " + lp.getSourceFolder()); } + + /** + * + * @param iterator + * @return + * @throws InterruptedException + */ + private boolean canProcess(FileIterator iterator) throws InterruptedException{ + if(stopRequested){ + waitTime = 0; + return false; + }else{ + if(iterator.hasNext()){ + waitTime = 0; + return true; + }else{ + try { + if(waitTime>=MAX_WAIT_TIME) + waitTime = 0; + Thread.sleep(waitTime++); + return true; + } catch (InterruptedException e) { + throw e; + } + } + } + } private void processFile(final Path path) throws Exception { final long t1 = System.currentTimeMillis(); diff --git a/processor/src/test/java/com/flytxt/parser/processor/FlyReaderTest.java b/processor/src/test/java/com/flytxt/parser/processor/FlyReaderTest.java index 65b8072..4c17209 100644 --- a/processor/src/test/java/com/flytxt/parser/processor/FlyReaderTest.java +++ b/processor/src/test/java/com/flytxt/parser/processor/FlyReaderTest.java @@ -2,23 +2,40 @@ import java.io.File; import java.io.IOException; +import java.lang.reflect.Field; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; +import java.nio.file.Files; import java.nio.file.LinkOption; import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardCopyOption; +import java.util.HashMap; +import java.util.Map; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import com.flytxt.tp.processor.FlyReader; import com.flytxt.tp.processor.LineProcessor; +import com.flytxt.tp.processor.LineProcessorImpl; +import com.flytxt.tp.processor.filefilter.FilterChainBuilder; +import com.flytxt.tp.processor.filefilter.FilterParameters; +import com.flytxt.tp.processor.filefilter.FlyFileFilter; import test.TestScript; public class FlyReaderTest { + + + private static final String DATA_FILE_PATH ="src"+File.separator+"test"+File.separator+"resources"+File.separator+"test-data"; + FlyReader fr = new FlyReader(); Method processFile; java.nio.file.Path path; + + @Before public void init(){ LineProcessor lp = new TestScript(); @@ -42,6 +59,8 @@ public void init(){ @Test public void test(){ try { + + processFile.invoke(fr, path); } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) { // TODO Auto-generated catch block @@ -49,5 +68,140 @@ public void test(){ } } + + @Test + public void test_call(){ + try { + + createTestData(); + fr.set(LineProcessorImpl.SOURCE_PTH, new LineProcessorImpl(), getFilter()); + startProcess(); + stopProcess(); + Thread.sleep(10000); + + } catch (Exception e) { + org.junit.Assert.fail(e.getMessage()); + } + + } + + private void stopProcess() { + new Thread(new Runnable() { + @Override + public void run() { + try { + Thread.sleep(2000); + fr.preDestroy(); + Boolean stpValue = (Boolean)getFiledValue("stopRequested"); + Assert.assertTrue(stpValue); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + }).start(); + + } + + + private Object getFiledValue(String filedName) { + try { + Field field = fr.getClass().getDeclaredField(filedName); + field.setAccessible(true); + return field.get(fr); + } catch (NoSuchFieldException | SecurityException | IllegalArgumentException | IllegalAccessException e) { + Assert.fail(e.getMessage()); + return null; + } + } + + + private void startProcess() { + new Thread(new Runnable() { + @Override + public void run() { + try { + fr.call(); + } catch (Exception e) { + e.printStackTrace(); + } + + } + }).start(); + } + + private void createTestData() { + File directory = new File(LineProcessorImpl.SOURCE_PTH); + if(!directory.exists()){ + directory.mkdirs(); + } + + String sourcePath = LineProcessorImpl.SOURCE_PTH +File.separator+"test-data"; + File file = new File(sourcePath); + if(!file.exists()){ + try { + Files.copy(Paths.get(DATA_FILE_PATH), Paths.get(sourcePath), StandardCopyOption.REPLACE_EXISTING); + } catch (IOException e) { + org.junit.Assert.fail("Unable to copy test data"); + } + } + } + + + + + /** + * + * @return + */ + public FlyFileFilter getFilter(){ + + FlyFileFilter fileFilter = new FlyFileFilter(); + FilterChainBuilder chainbuilder = new FilterChainBuilder(); + setFilterParameterMap(chainbuilder); + Map filterValues = new HashMap<>(); + filterValues.put("Filter_Name1", "" + + "com.flytxt.tp.processor.filefilter.LastModifiedWindowFilter," + + "com.flytxt.tp.processor.filefilter.RegexFilter," + + "com.flytxt.tp.processor.filefilter.FifoFilter"); + chainbuilder.setFilterNameMap(filterValues); + chainbuilder.build(); + + Field chainBuilderField; + try { + chainBuilderField = FlyFileFilter.class.getDeclaredField("builder"); + chainBuilderField.setAccessible(true); + chainBuilderField.set(fileFilter, chainbuilder); + } catch (NoSuchFieldException | SecurityException | IllegalArgumentException | IllegalAccessException e) { + e.printStackTrace(); + } + fileFilter.set(LineProcessorImpl.SOURCE_PTH,"Filter_Name1"); + return fileFilter; + } + + /** + * Filter Parameters + * @param chainbuilder + */ + private void setFilterParameterMap(FilterChainBuilder chainbuilder) { + try { + + FilterParameters filterParameters = new FilterParameters(); + Map parameterMap = new HashMap<>(); + parameterMap.put("com.flytxt.tp.processor.filefilter.LastModifiedWindowFilter", "0"); + parameterMap.put("com.flytxt.tp.processor.filefilter.RegexFilter", ".*\\.txt"); + Map> filterNamedMap = new HashMap<>(); + filterNamedMap.put("Filter_Name1", parameterMap); + filterParameters.setArgMap(filterNamedMap); + + Field filterParametersField = FilterChainBuilder.class.getDeclaredField("filterParameters"); + filterParametersField.setAccessible(true); + + filterParametersField.set(chainbuilder, filterParameters); + + } catch (NoSuchFieldException | SecurityException | IllegalArgumentException | IllegalAccessException e) { + e.printStackTrace(); + } + } } diff --git a/processor/src/test/java/com/flytxt/tp/processor/filefilter/FifoFilterTest.java b/processor/src/test/java/com/flytxt/tp/processor/filefilter/FifoFilterTest.java index c9e848e..dc0bdb2 100644 --- a/processor/src/test/java/com/flytxt/tp/processor/filefilter/FifoFilterTest.java +++ b/processor/src/test/java/com/flytxt/tp/processor/filefilter/FifoFilterTest.java @@ -2,6 +2,7 @@ import java.io.File; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; @@ -26,9 +27,15 @@ public void init(){ @Test public void testPositive(){ File[] okFiles = fifo.canProcess(files); + File preFile = null; for(File aFile: okFiles){ - if(null!=aFile) - System.out.println(aFile.getName()); + if(null!=aFile){ + Assert.assertNotNull(aFile.getName()); + if(preFile!=null){ + Assert.assertTrue(aFile.lastModified() >= preFile.lastModified()); + } + preFile =aFile; + } } } } diff --git a/processor/src/test/java/com/flytxt/tp/processor/filefilter/LastModifiedWindowFilterTest.java b/processor/src/test/java/com/flytxt/tp/processor/filefilter/LastModifiedWindowFilterTest.java index 7f20e00..ea668dd 100644 --- a/processor/src/test/java/com/flytxt/tp/processor/filefilter/LastModifiedWindowFilterTest.java +++ b/processor/src/test/java/com/flytxt/tp/processor/filefilter/LastModifiedWindowFilterTest.java @@ -2,6 +2,7 @@ import java.io.File; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; @@ -9,14 +10,16 @@ public class LastModifiedWindowFilterTest{ LastModifiedWindowFilter lm = new LastModifiedWindowFilter(); File [] files = new File[10]; + long currentTime = 0; @Before public void init(){ + currentTime = System.currentTimeMillis(); for(int i =0; i < 10; i++){ File mockedFile = Mockito.mock(File.class); Mockito.when(mockedFile.getName()).thenReturn(i+".txt"); if(i%2==0) - Mockito.when(mockedFile.lastModified()).thenReturn(System.currentTimeMillis()-500000); + Mockito.when(mockedFile.lastModified()).thenReturn(currentTime -50000); else Mockito.when(mockedFile.lastModified()).thenReturn(System.currentTimeMillis()); files[i] = mockedFile; @@ -24,12 +27,15 @@ public void init(){ } @Test - public void testPositive(){ + public void testPositive(){ lm.windowUnit = 50000; File[] okFiles = lm.canProcess(files); for(File aFile: okFiles){ - if(aFile != null)System.out.println(aFile.getName()); - } + if(aFile != null){ + Assert.assertNotNull(aFile.getName()); + Assert.assertTrue(aFile.lastModified() >= (currentTime-50000)); + } + } } } diff --git a/processor/src/test/java/com/flytxt/tp/processor/filefilter/RegexFilterTest.java b/processor/src/test/java/com/flytxt/tp/processor/filefilter/RegexFilterTest.java index 07be616..7ce4975 100644 --- a/processor/src/test/java/com/flytxt/tp/processor/filefilter/RegexFilterTest.java +++ b/processor/src/test/java/com/flytxt/tp/processor/filefilter/RegexFilterTest.java @@ -1,11 +1,8 @@ package com.flytxt.tp.processor.filefilter; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertTrue; - import java.io.File; -import java.util.Arrays; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; @@ -16,9 +13,15 @@ public class RegexFilterTest { @Before public void init(){ for(int i =0; i < 10; i++){ - File mockedFile = Mockito.mock(File.class); - Mockito.when(mockedFile.getName()).thenReturn(i+".txt"); - files[i] = mockedFile; + File mockedFile = null; + if(i%2==0){ + mockedFile = Mockito.mock(File.class); + Mockito.when(mockedFile.getName()).thenReturn(i+".txt"); + }else{ + mockedFile = Mockito.mock(File.class); + Mockito.when(mockedFile.getName()).thenReturn(i+".csv"); + } + files[i] = mockedFile; } } @@ -27,7 +30,12 @@ public void testPositive(){ String regex = ".*\\.txt"; rf.setRegex(regex); File[] okFiles = rf.canProcess(files); - //TODO + for(File file : okFiles){ + if(null!=file) + Assert.assertTrue(file.getName().contains(".txt")); + } + + } @Test @@ -35,7 +43,10 @@ public void testNegative(){ String regex = ".*\\.csv"; rf.setRegex(regex); File[] okFiles = rf.canProcess(files); - //TODO + for(File file : okFiles){ + if(null!=file) + Assert.assertTrue(file.getName().contains(".csv")); + } } }