+ * Note that the method is not called under the session lock. It means that if + * implementation + * requires access to the application/session data then the session has to be + * locked explicitly. + *
+ * If a semaphore has been set, it controls access to this method, enforcing a
+ * timeout. A permit
+ * will be acquired from the semaphore, if one becomes available within the
+ * given waiting time and
+ * the current thread has not been {@linkplain Thread#interrupt interrupted}.
+ *
+ * @param event the download event containing the output stream and session
+ * @throws IOException if an IO error occurred
+ * @throws InterruptedIOException if the current thread is interrupted
+ * @throws InterruptedByTimeoutException if the waiting time elapsed before a
+ * permit was acquired
+ */
+ @Override
+ public final void handleDownloadRequest(DownloadEvent event) throws IOException {
+ runWithSemaphore(event.getSession(), () -> delegate.handleDownloadRequest(event));
+ }
+
+}
diff --git a/src/main/java/com/flowingcode/vaadin/addons/gridexporter/ConcurrentOperationBase.java b/src/main/java/com/flowingcode/vaadin/addons/gridexporter/ConcurrentOperationBase.java
new file mode 100644
index 0000000..d345f6c
--- /dev/null
+++ b/src/main/java/com/flowingcode/vaadin/addons/gridexporter/ConcurrentOperationBase.java
@@ -0,0 +1,258 @@
+/*-
+ * #%L
+ * Grid Exporter Add-on
+ * %%
+ * Copyright (C) 2022 - 2024 Flowing Code
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+package com.flowingcode.vaadin.addons.gridexporter;
+
+import com.vaadin.flow.component.UI;
+import com.vaadin.flow.server.VaadinSession;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.nio.channels.InterruptedByTimeoutException;
+import java.util.Optional;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.function.IntFunction;
+
+/**
+ * Base class containing shared semaphore logic for concurrent download/upload
+ * control.
+ * This class is used by both ConcurrentStreamResourceWriter and
+ * ConcurrentDownloadHandler
+ * to avoid code duplication.
+ *
+ * @author Javier Godoy
+ */
+@SuppressWarnings("serial")
+abstract class ConcurrentOperationBase {
+
+ public static final float MAX_COST = 0x7FFF;
+ public static final float MIN_COST = 1.0f / 0x10000;
+ public static final float DEFAULT_COST = 1.0f;
+
+ static final ConfigurableSemaphore semaphore = new ConfigurableSemaphore();
+ static volatile boolean enabled;
+ static volatile boolean failOnUiChange;
+
+ static final class ConfigurableSemaphore extends Semaphore {
+
+ int maxPermits; // package-private for access from subclasses
+
+ ConfigurableSemaphore() {
+ super(0);
+ }
+
+ synchronized void setPermits(int permits) {
+ if (permits < 0) {
+ throw new IllegalArgumentException();
+ }
+ int delta = permits - maxPermits;
+ if (delta > 0) {
+ super.release(delta);
+ } else if (delta < 0) {
+ super.reducePermits(-delta);
+ }
+ maxPermits = permits;
+ }
+
+ @Override
+ public String toString() {
+ IntFunction
+ * Finite limits are capped to {@link #MAX_COST} (32767). If the limit is
+ * {@link Float#POSITIVE_INFINITY POSITIVE_INFINITY}, the semaphore will not be
+ * used for
+ * controlling concurrent operations.
+ *
+ * @param limit the maximum cost of concurrent operations allowed
+ * @throws IllegalArgumentException if the limit is zero or negative.
+ */
+ public static void setLimit(float limit) {
+ if (limit <= 0) {
+ throw new IllegalArgumentException();
+ }
+ if (Float.isInfinite(limit)) {
+ enabled = false;
+ return;
+ }
+
+ synchronized (semaphore) {
+ enabled = true;
+ semaphore.setPermits(costToPermits(limit, Integer.MAX_VALUE));
+ }
+ }
+
+ static void setFailOnUiChange(boolean failOnUiChange) {
+ ConcurrentOperationBase.failOnUiChange = failOnUiChange;
+ }
+
+ /**
+ * Returns the limit for the number of concurrent operations.
+ *
+ * @return the limit for the number of concurrent operations, or
+ * {@link Float#POSITIVE_INFINITY}
+ * if the semaphore is disabled.
+ */
+ public static float getLimit() {
+ if (enabled) {
+ synchronized (semaphore) {
+ return (float) semaphore.maxPermits / 0x10000;
+ }
+ } else {
+ return Float.POSITIVE_INFINITY;
+ }
+ }
+
+ static int costToPermits(float cost, int maxPermits) {
+ // restrict limit to 0x7fff to ensure the cost can be represented
+ // using fixed-point arithmetic with 16 fractional digits and 15 integral digits
+ cost = Math.min(cost, MAX_COST);
+ // Determine the number of permits required based on the cost, capping at
+ // maxPermits.
+ // If the cost is zero or negative, no permits are needed.
+ // Any positive cost, no matter how small, will require at least one permit.
+ return cost <= 0 ? 0 : Math.max(Math.min((int) (cost * 0x10000), maxPermits), 1);
+ }
+
+ /**
+ * Sets the timeout for acquiring a permit to start a download when there are
+ * not enough permits
+ * available in the semaphore.
+ *
+ * @return the timeout in nanoseconds.
+ */
+ public abstract long getTimeout();
+
+ /**
+ * Returns the cost of this download.
+ *
+ * Note that the method is not called under the session lock. It means that if
+ * implementation
+ * requires access to the application/session data then the session has to be
+ * locked explicitly.
+ *
+ * @param session vaadin session
+ */
+ public float getCost(VaadinSession session) {
+ return DEFAULT_COST;
+ }
+
+ /**
+ * Returns the UI associated with the current download.
+ *
+ * This method is used to ensure that the UI is still attached to the current
+ * session when a
+ * download is initiated. Implementations should return the appropriate UI
+ * instance.
+ *
+ * Implementations can use this method to perform any necessary actions in
+ * response to the
+ * timeout, such as logging a warning or notifying the user.
+ *
+ * This method is called at the start of the download process.
+ * Subclasses should implement this method to perform any necessary actions
+ * before the download
+ * begins.
+ */
+ protected abstract void onAccept();
+
+ /**
+ * Callback method that is invoked when a download finishes.
+ *
+ * This method is called at the end of the download process.
+ * Subclasses should implement this method to perform any necessary actions
+ * after the download
+ * completes.
+ */
+ protected abstract void onFinish();
+
+ @FunctionalInterface
+ protected interface RunnableWithIOException {
+ void run() throws IOException;
+ }
+
+ protected void runWithSemaphore(VaadinSession session, RunnableWithIOException task)
+ throws IOException {
+ onAccept();
+ try {
+ if (!enabled) {
+ task.run();
+ } else {
+ try {
+ int permits;
+ float cost = getCost(session);
+ synchronized (semaphore) {
+ permits = costToPermits(cost, semaphore.maxPermits);
+ }
+
+ UI ui = failOnUiChange ? getAttachedUI() : null;
+
+ if (semaphore.tryAcquire(permits, getTimeout(), TimeUnit.NANOSECONDS)) {
+ try {
+ if (ui != null && getAttachedUI() != ui) {
+ throw new IOException("Detached UI");
+ }
+ task.run();
+ } finally {
+ semaphore.release(permits);
+ }
+ } else {
+ onTimeout();
+ throw new InterruptedByTimeoutException();
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw (IOException) new InterruptedIOException().initCause(e);
+ }
+ }
+ } finally {
+ onFinish();
+ }
+ }
+}
diff --git a/src/main/java/com/flowingcode/vaadin/addons/gridexporter/ConcurrentStreamResourceWriter.java b/src/main/java/com/flowingcode/vaadin/addons/gridexporter/ConcurrentStreamResourceWriter.java
index 28fcd11..fb3678d 100644
--- a/src/main/java/com/flowingcode/vaadin/addons/gridexporter/ConcurrentStreamResourceWriter.java
+++ b/src/main/java/com/flowingcode/vaadin/addons/gridexporter/ConcurrentStreamResourceWriter.java
@@ -19,132 +19,39 @@
*/
package com.flowingcode.vaadin.addons.gridexporter;
-import com.vaadin.flow.component.UI;
import com.vaadin.flow.server.StreamResourceWriter;
import com.vaadin.flow.server.VaadinSession;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.nio.channels.InterruptedByTimeoutException;
-import java.util.Optional;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
-import java.util.function.IntFunction;
/**
* An implementation of {@link StreamResourceWriter} that controls access to the
- * {@link #accept(OutputStream, VaadinSession) accept} method using a semaphore to manage
+ * {@link #accept(OutputStream, VaadinSession) accept} method using a semaphore
+ * to manage
* concurrency.
*
* @author Javier Godoy
*/
@SuppressWarnings("serial")
-abstract class ConcurrentStreamResourceWriter implements StreamResourceWriter {
-
- public static final float MAX_COST = 0x7FFF;
-
- public static final float MIN_COST = 1.0f / 0x10000;
-
- public static final float DEFAULT_COST = 1.0f;
-
- private static final ConfigurableSemaphore semaphore = new ConfigurableSemaphore();
-
- private static volatile boolean enabled;
-
- private static volatile boolean failOnUiChange;
+abstract class ConcurrentStreamResourceWriter extends ConcurrentOperationBase implements StreamResourceWriter {
private final StreamResourceWriter delegate;
- private static final class ConfigurableSemaphore extends Semaphore {
-
- private int maxPermits;
-
- ConfigurableSemaphore() {
- super(0);
- }
-
- synchronized void setPermits(int permits) {
- if (permits < 0) {
- throw new IllegalArgumentException();
- }
- int delta = permits - maxPermits;
- if (delta > 0) {
- super.release(delta);
- } else if (delta < 0) {
- super.reducePermits(-delta);
- }
- maxPermits = permits;
- }
-
- @Override
- public String toString() {
- IntFunction
- * Finite limits are capped to {@link #MAX_COST} (32767). If the limit is
- * {@link Float#POSITIVE_INFINITY POSITIVE_INFINITY}, the semaphore will not be used for
- * controlling concurrent downloads.
- *
- * @param limit the maximum cost of concurrent downloads allowed
- * @throws IllegalArgumentException if the limit is zero or negative.
- */
public static void setLimit(float limit) {
- if (limit <= 0) {
- throw new IllegalArgumentException();
- }
- if (Float.isInfinite(limit)) {
- enabled = false;
- return;
- }
-
- synchronized (semaphore) {
- enabled = true;
- semaphore.setPermits(costToPermits(limit, Integer.MAX_VALUE));
- }
- }
-
- static void setFailOnUiChange(boolean failOnUiChange) {
- ConcurrentStreamResourceWriter.failOnUiChange = failOnUiChange;
+ ConcurrentOperationBase.setLimit(limit);
}
- /**
- * Returns the limit for the number of concurrent downloads.
- *
- * @return the limit for the number of concurrent downloads, or {@link Float#POSITIVE_INFINITY} if
- * the semaphore is disabled.
- */
public static float getLimit() {
- if (enabled) {
- synchronized (semaphore) {
- return (float) semaphore.maxPermits / 0x10000;
- }
- } else {
- return Float.POSITIVE_INFINITY;
- }
- }
-
- private static int costToPermits(float cost, int maxPermits) {
- // restrict limit to 0x7fff to ensure the cost can be represented
- // using fixed-point arithmetic with 16 fractional digits and 15 integral digits
- cost = Math.min(cost, MAX_COST);
- // Determine the number of permits required based on the cost, capping at maxPermits.
- // If the cost is zero or negative, no permits are needed.
- // Any positive cost, no matter how small, will require at least one permit.
- return cost <= 0 ? 0 : Math.max(Math.min((int) (cost * 0x10000), maxPermits), 1);
+ return ConcurrentOperationBase.getLimit();
}
/**
- * Constructs a {@code ConcurrentStreamResourceWriter} with the specified delegate. The delegate
- * is a {@link StreamResourceWriter} that performs the actual writing to the stream.
+ * Constructs a {@code ConcurrentStreamResourceWriter} with the specified
+ * delegate. The delegate
+ * is a {@link StreamResourceWriter} that performs the actual writing to the
+ * stream.
*
* @param delegate the delegate {@code InputStreamFactory}
*/
@@ -153,138 +60,30 @@ private static int costToPermits(float cost, int maxPermits) {
}
/**
- * Sets the timeout for acquiring a permit to start a download when there are not enough permits
- * available in the semaphore.
- *
- * @see GridExporter#setConcurrentDownloadTimeout(long, TimeUnit)
- * @return the timeout in nanoseconds.
- */
- public abstract long getTimeout();
-
- /**
- * Returns the cost of this download.
- *
- * Note that the method is not called under the session lock. It means that if implementation
- * requires access to the application/session data then the session has to be locked explicitly.
- *
- * @param session vaadin session
- * @see GridExporter#setConcurrentDownloadCost(float)
- */
- public float getCost(VaadinSession session) {
- return DEFAULT_COST;
- }
-
- /**
- * Returns the UI associated with the current download.
- *
- * This method is used to ensure that the UI is still attached to the current session when a
- * download is initiated. Implementations should return the appropriate UI instance.
- *
- * Implementations can use this method to perform any necessary actions in response to the
- * timeout, such as logging a warning or notifying the user.
- *
- * This method is called at the start of the download process, right after the
- * {@link #accept(OutputStream, VaadinSession) accept} method is invoked and it has been
- * determined that the download can proceed. Subclasses should implement this method to perform
- * any necessary actions before the download begins, such as initializing resources, logging, or
- * updating the UI to reflect the start of the download.
- *
- * Note that this method is called before any semaphore permits are acquired, so it is executed
- * regardless of whether the semaphore is enabled or not.
- *
- * This method is called at the end of the download process, right before the
- * {@link #accept(OutputStream, VaadinSession) accept} method returns, regardless of whether the
- * download was successful, timed out, or encountered an error. Subclasses should implement this
- * method to perform any necessary actions after the download completes, such as releasing
- * resources, logging, or updating the UI to reflect the completion of the download.
+ * Note that the method is not called under the session lock. It means that if
+ * implementation
+ * requires access to the application/session data then the session has to be
+ * locked explicitly.
*
- * Note that this method is always called, even if an exception is thrown during the download
- * process, ensuring that any necessary cleanup can be performed.
- *
- * Note that the method is not called under the session lock. It means that if implementation
- * requires access to the application/session data then the session has to be locked explicitly.
- *
- * If a semaphore has been set, it controls access to this method, enforcing a timeout. A permit
- * will be acquired from the semaphore, if one becomes available within the given waiting time and
+ * If a semaphore has been set, it controls access to this method, enforcing a
+ * timeout. A permit
+ * will be acquired from the semaphore, if one becomes available within the
+ * given waiting time and
* the current thread has not been {@linkplain Thread#interrupt interrupted}.
*
- * @param stream data output stream
+ * @param stream data output stream
* @param session vaadin session
- * @throws IOException if an IO error occurred
- * @throws InterruptedIOException if the current thread is interrupted
- * @throws InterruptedByTimeoutException if the waiting time elapsed before a permit was acquired
+ * @throws IOException if an IO error occurred
+ * @throws InterruptedIOException if the current thread is interrupted
+ * @throws InterruptedByTimeoutException if the waiting time elapsed before a
+ * permit was acquired
*/
@Override
public final void accept(OutputStream stream, VaadinSession session) throws IOException {
- onAccept();
- try {
- if (!enabled) {
- delegate.accept(stream, session);
- } else {
-
- try {
- int permits;
- float cost = getCost(session);
- synchronized (semaphore) {
- permits = costToPermits(cost, semaphore.maxPermits);
- }
-
- UI ui = failOnUiChange ? getAttachedUI() : null;
-
- if (semaphore.tryAcquire(permits, getTimeout(), TimeUnit.NANOSECONDS)) {
- try {
- if (ui != null && getAttachedUI()!=ui) {
- // The UI has changed or was detached after acquirig the semaphore
- throw new IOException("Detached UI");
- }
- delegate.accept(stream, session);
- } finally {
- semaphore.release(permits);
- }
- } else {
- onTimeout();
- throw new InterruptedByTimeoutException();
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw (IOException) new InterruptedIOException().initCause(e);
- }
- }
- } finally {
- onFinish();
- }
+ runWithSemaphore(session, () -> delegate.accept(stream, session));
}
}
diff --git a/src/main/java/com/flowingcode/vaadin/addons/gridexporter/GridExporter.java b/src/main/java/com/flowingcode/vaadin/addons/gridexporter/GridExporter.java
index 5ff3b56..f47b18b 100644
--- a/src/main/java/com/flowingcode/vaadin/addons/gridexporter/GridExporter.java
+++ b/src/main/java/com/flowingcode/vaadin/addons/gridexporter/GridExporter.java
@@ -42,6 +42,8 @@
import com.vaadin.flow.server.StreamResource;
import com.vaadin.flow.server.StreamResourceWriter;
import com.vaadin.flow.server.VaadinSession;
+import com.vaadin.flow.server.streams.DownloadHandler;
+
import com.vaadin.flow.shared.Registration;
import java.io.Serializable;
import java.lang.reflect.Field;
@@ -221,13 +223,13 @@ Object extractValueFromColumn(T item, Column