Skip to content

Commit 7aaef1e

Browse files
artembilangaryrussell
authored andcommitted
GH-3554: Eval remote dir on each synchToLocal (#3556)
* GH-3554: Eval remote dir on each synchToLocal Fixes #3554 The `remoteDirectoryExpression` was introduced into an `AbstractInboundFileSynchronizer` to let end-user to evaluate a remote directory on each poll (essentially on each `synchronizeToLocalDirectory()` call). The fix for the https://jira.spring.io/browse/INT-4491 introduced a regression when such an expression was evaluated only once when we call a `setRemoteDirectory()`. So, an original purpose of this option was lost and we don't get an actual remote dir on each poll * Remove `evaluateRemoteDirectory()` method and its usage since it doesn't reflect expectation of the remote dir expression property * Reinstate the `remoteDirectoryExpression` evaluation in the `synchronizeToLocalDirectory()` * Propagate the result of that expression into further methods for copying files * Remove setting of the `remoteDirectory` variable into a global `EvaluationContext` of the `AbstractInboundFileSynchronizer` instance since it is not thread-safe * Instead create an `EvaluationContext` locally for each `synchronizeToLocalDirectory()` call and set the `remoteDirectory` variable into this scoped instances * Generate a local file name from the `localFilenameGeneratorExpression` against locally created `EvaluationContext` with the mentioned `remoteDirectory` variable * Cover the expected functionality with a unit-test **Cherry-pick to `5.4.x` & `5.3.x`** * * Fix `testRemoteDirectoryRefreshedOnEachSynchronization` according PR review
1 parent 7da58b3 commit 7aaef1e

File tree

3 files changed

+102
-44
lines changed

3 files changed

+102
-44
lines changed

spring-integration-file/src/main/java/org/springframework/integration/file/remote/synchronizer/AbstractInboundFileSynchronizer.java

Lines changed: 33 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2020 the original author or authors.
2+
* Copyright 2002-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -102,11 +102,6 @@ public abstract class AbstractInboundFileSynchronizer<F>
102102
*/
103103
private Expression remoteDirectoryExpression;
104104

105-
/**
106-
* The current evaluation of the expression.
107-
*/
108-
private String evaluatedRemoteDirectory;
109-
110105
/**
111106
* An {@link FileListFilter} that runs against the <em>remote</em> file system view.
112107
*/
@@ -203,7 +198,6 @@ public void setTemporaryFileSuffix(String temporaryFileSuffix) {
203198
*/
204199
public void setRemoteDirectory(String remoteDirectory) {
205200
this.remoteDirectoryExpression = new LiteralExpression(remoteDirectory);
206-
evaluateRemoteDirectory();
207201
}
208202

209203
/**
@@ -229,7 +223,6 @@ public void setRemoteDirectoryExpressionString(String remoteDirectoryExpression)
229223
protected final void doSetRemoteDirectoryExpression(Expression expression) {
230224
Assert.notNull(expression, "'remoteDirectoryExpression' must not be null");
231225
this.remoteDirectoryExpression = expression;
232-
evaluateRemoteDirectory();
233226
}
234227

235228
/**
@@ -298,7 +291,6 @@ public final void afterPropertiesSet() {
298291
if (this.evaluationContext == null) {
299292
this.evaluationContext = ExpressionUtils.createStandardEvaluationContext(this.beanFactory);
300293
}
301-
evaluateRemoteDirectory();
302294
if (!StringUtils.hasText(this.metadataStorePrefix)) {
303295
this.metadataStorePrefix = this.name;
304296
}
@@ -341,26 +333,27 @@ public void synchronizeToLocalDirectory(final File localDirectory, final int max
341333
}
342334
return;
343335
}
336+
String remoteDirectory = this.remoteDirectoryExpression.getValue(this.evaluationContext, String.class);
344337
if (this.logger.isTraceEnabled()) {
345-
this.logger.trace("Synchronizing " + this.evaluatedRemoteDirectory + " to " + localDirectory);
338+
this.logger.trace("Synchronizing " + remoteDirectory + " to " + localDirectory);
346339
}
347340
try {
348341
int transferred = this.remoteFileTemplate.execute(session ->
349-
transferFilesFromRemoteToLocal(localDirectory, maxFetchSize, session));
342+
transferFilesFromRemoteToLocal(remoteDirectory, localDirectory, maxFetchSize, session));
350343
if (this.logger.isDebugEnabled()) {
351-
this.logger.debug(transferred + " files transferred from '" + this.evaluatedRemoteDirectory + "'");
344+
this.logger.debug(transferred + " files transferred from '" + remoteDirectory + "'");
352345
}
353346
}
354347
catch (Exception e) {
355348
throw new MessagingException("Problem occurred while synchronizing '"
356-
+ this.evaluatedRemoteDirectory + "' to local directory", e);
349+
+ remoteDirectory + "' to local directory", e);
357350
}
358351
}
359352

360-
private Integer transferFilesFromRemoteToLocal(File localDirectory, int maxFetchSize, Session<F> session)
361-
throws IOException {
353+
private Integer transferFilesFromRemoteToLocal(String remoteDirectory, File localDirectory,
354+
int maxFetchSize, Session<F> session) throws IOException {
362355

363-
F[] files = session.list(this.evaluatedRemoteDirectory);
356+
F[] files = session.list(remoteDirectory);
364357
if (!ObjectUtils.isEmpty(files)) {
365358
files = FileUtils.purgeUnwantedElements(files, e -> !isFile(e), this.comparator);
366359
}
@@ -372,6 +365,12 @@ private Integer transferFilesFromRemoteToLocal(File localDirectory, int maxFetch
372365
int copied = filteredFiles.size();
373366
int accepted = 0;
374367

368+
EvaluationContext localFileEvaluationContext = null;
369+
if (this.localFilenameGeneratorExpression != null) {
370+
localFileEvaluationContext = ExpressionUtils.createStandardEvaluationContext(this.beanFactory);
371+
localFileEvaluationContext.setVariable("remoteDirectory", remoteDirectory);
372+
}
373+
375374
for (F file : filteredFiles) {
376375
if (filteringOneByOne) {
377376
if ((maxFetchSize < 0 || accepted < maxFetchSize) && this.filter
@@ -383,7 +382,9 @@ private Integer transferFilesFromRemoteToLocal(File localDirectory, int maxFetch
383382
copied--;
384383
}
385384
}
386-
copied = copyIfNotNull(localDirectory, session, filteringOneByOne, filteredFiles, copied, file);
385+
copied =
386+
copyIfNotNull(remoteDirectory, localDirectory, localFileEvaluationContext, session,
387+
filteringOneByOne, filteredFiles, copied, file);
387388
}
388389
return copied;
389390
}
@@ -392,13 +393,16 @@ private Integer transferFilesFromRemoteToLocal(File localDirectory, int maxFetch
392393
}
393394
}
394395

395-
private int copyIfNotNull(File localDirectory, Session<F> session, boolean filteringOneByOne, List<F> filteredFiles,
396-
int copied, @Nullable F file) throws IOException {
396+
private int copyIfNotNull(String remoteDirectory, File localDirectory,
397+
@Nullable EvaluationContext localFileEvaluationContext, Session<F> session, boolean filteringOneByOne,
398+
List<F> filteredFiles, int copied, @Nullable F file) throws IOException {
397399

398400
boolean renamedFailed = false;
399401
try {
400-
if (file != null && !copyFileToLocalDirectory(this.evaluatedRemoteDirectory, file,
401-
localDirectory, session)) {
402+
if (file != null &&
403+
!copyFileToLocalDirectory(remoteDirectory, localFileEvaluationContext, file, localDirectory,
404+
session)) {
405+
402406
renamedFailed = true;
403407
}
404408
}
@@ -440,11 +444,12 @@ protected void rollbackFromFileToListEnd(List<F> filteredFiles, F file) {
440444
}
441445
}
442446

443-
protected boolean copyFileToLocalDirectory(String remoteDirectoryPath, F remoteFile, // NOSONAR
444-
File localDirectory, Session<F> session) throws IOException {
447+
protected boolean copyFileToLocalDirectory(String remoteDirectoryPath, // NOSONAR
448+
@Nullable EvaluationContext localFileEvaluationContext, F remoteFile, File localDirectory,
449+
Session<F> session) throws IOException {
445450

446451
String remoteFileName = getFilename(remoteFile);
447-
String localFileName = generateLocalFileName(remoteFileName);
452+
String localFileName = generateLocalFileName(remoteFileName, localFileEvaluationContext);
448453
String remoteFilePath = remoteDirectoryPath != null
449454
? (remoteDirectoryPath + this.remoteFileSeparator + remoteFileName)
450455
: remoteFileName;
@@ -567,22 +572,16 @@ else if (this.logger.isInfoEnabled()) {
567572
return renamed;
568573
}
569574

570-
private String generateLocalFileName(String remoteFileName) {
575+
private String generateLocalFileName(String remoteFileName,
576+
@Nullable EvaluationContext localFileEvaluationContext) {
577+
571578
if (this.localFilenameGeneratorExpression != null) {
572-
return this.localFilenameGeneratorExpression.getValue(this.evaluationContext, remoteFileName,
579+
return this.localFilenameGeneratorExpression.getValue(localFileEvaluationContext, remoteFileName,
573580
String.class);
574581
}
575582
return remoteFileName;
576583
}
577584

578-
protected void evaluateRemoteDirectory() {
579-
if (this.evaluationContext != null) {
580-
this.evaluatedRemoteDirectory = this.remoteDirectoryExpression.getValue(this.evaluationContext,
581-
String.class);
582-
this.evaluationContext.setVariable("remoteDirectory", this.evaluatedRemoteDirectory);
583-
}
584-
}
585-
586585
/**
587586
* Obtain a metadata for remote file associated with the provided local file.
588587
* @param localFile the local file to retrieve metadata for.

spring-integration-file/src/test/java/org/springframework/integration/file/remote/synchronizer/AbstractRemoteFileSynchronizerTests.java

Lines changed: 65 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2014-2019 the original author or authors.
2+
* Copyright 2014-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -18,21 +18,27 @@
1818

1919
import static org.assertj.core.api.Assertions.assertThat;
2020
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
21+
import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
2122
import static org.mockito.Mockito.mock;
2223

2324
import java.io.File;
2425
import java.io.IOException;
2526
import java.io.InputStream;
2627
import java.io.OutputStream;
28+
import java.util.LinkedList;
29+
import java.util.Queue;
2730
import java.util.UUID;
2831
import java.util.concurrent.atomic.AtomicBoolean;
2932
import java.util.concurrent.atomic.AtomicInteger;
3033
import java.util.stream.Collectors;
3134
import java.util.stream.Stream;
3235

33-
import org.junit.Test;
36+
import org.junit.jupiter.api.Test;
37+
import org.junit.jupiter.api.io.TempDir;
3438

3539
import org.springframework.beans.factory.BeanFactory;
40+
import org.springframework.expression.EvaluationContext;
41+
import org.springframework.integration.expression.SupplierExpression;
3642
import org.springframework.integration.file.HeadDirectoryScanner;
3743
import org.springframework.integration.file.filters.AcceptOnceFileListFilter;
3844
import org.springframework.integration.file.filters.ChainFileListFilter;
@@ -78,8 +84,10 @@ protected String protocol() {
7884
}
7985

8086
@Override
81-
protected boolean copyFileToLocalDirectory(String remoteDirectoryPath, String remoteFile,
87+
protected boolean copyFileToLocalDirectory(String remoteDirectoryPath,
88+
EvaluationContext localFileEvaluationContext, String remoteFile,
8289
File localDirectory, Session<String> session) throws IOException {
90+
8391
if ("bar".equals(remoteFile) && failWhenCopyingBar.getAndSet(false)) {
8492
throw new IOException("fail");
8593
}
@@ -209,13 +217,60 @@ public void testExclusiveWatchService() {
209217
assertThat(count.get()).isEqualTo(1);
210218
}
211219

212-
@Test(expected = IllegalStateException.class)
220+
@Test
213221
public void testScannerAndWatchServiceConflict() {
214222
final AtomicInteger count = new AtomicInteger();
215223
AbstractInboundFileSynchronizingMessageSource<String> source = createSource(count);
216224
source.setUseWatchService(true);
217225
source.setScanner(new HeadDirectoryScanner(1));
218-
source.afterPropertiesSet();
226+
assertThatIllegalStateException()
227+
.isThrownBy(source::afterPropertiesSet);
228+
}
229+
230+
@Test
231+
public void testRemoteDirectoryRefreshedOnEachSynchronization(@TempDir File localDir) {
232+
AbstractInboundFileSynchronizer<String> sync =
233+
new AbstractInboundFileSynchronizer<String>(new StringSessionFactory()) {
234+
235+
@Override
236+
protected boolean isFile(String file) {
237+
return true;
238+
}
239+
240+
@Override
241+
protected String getFilename(String file) {
242+
return file;
243+
}
244+
245+
@Override
246+
protected long getModified(String file) {
247+
return 0;
248+
}
249+
250+
@Override
251+
protected String protocol() {
252+
return "mock";
253+
}
254+
255+
};
256+
257+
Queue<String> remoteDirs = new LinkedList<>();
258+
remoteDirs.add("dir1");
259+
remoteDirs.add("dir2");
260+
sync.setRemoteDirectoryExpression(new SupplierExpression<>(remoteDirs::poll));
261+
sync.setLocalFilenameGeneratorExpressionString("#remoteDirectory+'/'+#root");
262+
sync.setBeanFactory(mock(BeanFactory.class));
263+
sync.afterPropertiesSet();
264+
265+
sync.synchronizeToLocalDirectory(localDir);
266+
sync.synchronizeToLocalDirectory(localDir);
267+
268+
/*Files.find(localDir.toPath(),
269+
Integer.MAX_VALUE,
270+
(filePath, fileAttr) -> fileAttr.isRegularFile())
271+
.forEach(System.out::println);*/
272+
273+
assertThat(localDir.list()).contains("dir1", "dir2");
219274
}
220275

221276
private AbstractInboundFileSynchronizingMessageSource<String> createSource(AtomicInteger count) {
@@ -267,8 +322,10 @@ protected String protocol() {
267322
}
268323

269324
@Override
270-
protected boolean copyFileToLocalDirectory(String remoteDirectoryPath, String remoteFile,
325+
protected boolean copyFileToLocalDirectory(String remoteDirectoryPath,
326+
EvaluationContext localFileEvaluationContext, String remoteFile,
271327
File localDirectory, Session<String> session) {
328+
272329
count.incrementAndGet();
273330
return true;
274331
}
@@ -301,7 +358,7 @@ public boolean remove(String path) {
301358

302359
@Override
303360
public String[] list(String path) {
304-
return new String[] { "foo", "bar", "baz" };
361+
return new String[]{ "foo", "bar", "baz" };
305362
}
306363

307364
@Override
@@ -366,7 +423,7 @@ public Object getClientInstance() {
366423

367424
@Override
368425
public String getHostPort() {
369-
return null;
426+
return "mock:6666";
370427
}
371428

372429
}

spring-integration-ftp/src/test/java/org/springframework/integration/ftp/config/FtpInboundChannelAdapterParserTests.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2020 the original author or authors.
2+
* Copyright 2002-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -36,6 +36,7 @@
3636
import org.springframework.context.ApplicationContext;
3737
import org.springframework.expression.Expression;
3838
import org.springframework.integration.endpoint.SourcePollingChannelAdapter;
39+
import org.springframework.integration.expression.ExpressionUtils;
3940
import org.springframework.integration.file.DirectoryScanner;
4041
import org.springframework.integration.file.filters.CompositeFileListFilter;
4142
import org.springframework.integration.file.filters.FileListFilter;
@@ -136,7 +137,8 @@ public void testFtpInboundChannelAdapterComplete() throws Exception {
136137
method.setAccessible(true);
137138
genMethod.set(method);
138139
}, method -> "generateLocalFileName".equals(method.getName()));
139-
assertThat(genMethod.get().invoke(fisync, "foo")).isEqualTo("FOO.afoo");
140+
assertThat(genMethod.get().invoke(fisync, "foo", ExpressionUtils.createStandardEvaluationContext(this.context)))
141+
.isEqualTo("FOO.afoo");
140142
assertThat(inbound.getMaxFetchSize()).isEqualTo(42);
141143
}
142144

0 commit comments

Comments
 (0)