From 5191fce5a69f5d11bb8c4d3305d791a948905856 Mon Sep 17 00:00:00 2001 From: Mark Libucha Date: Sat, 16 Jan 2016 14:28:42 -0800 Subject: [PATCH] Eliminate thread leaks in threaded listeners. --- .../org/lwes/listener/DatagramDequeuer.java | 15 +++++---- .../org/lwes/listener/DatagramEnqueuer.java | 10 ++++-- .../org/lwes/listener/ThreadedDequeuer.java | 5 +++ .../listener/ThreadedEventDispatcher.java | 9 +++-- .../java/org/lwes/listener/TestListener.java | 33 +++++++++++++++++++ 5 files changed, 62 insertions(+), 10 deletions(-) create mode 100644 src/test/java/org/lwes/listener/TestListener.java diff --git a/src/main/java/org/lwes/listener/DatagramDequeuer.java b/src/main/java/org/lwes/listener/DatagramDequeuer.java index 66c0f01..16137d7 100644 --- a/src/main/java/org/lwes/listener/DatagramDequeuer.java +++ b/src/main/java/org/lwes/listener/DatagramDequeuer.java @@ -20,12 +20,13 @@ import java.io.IOException; import java.net.DatagramPacket; +import java.util.concurrent.TimeUnit; public class DatagramDequeuer extends ThreadedDequeuer { private static transient Log log = LogFactory.getLog(DatagramDequeuer.class); - private boolean running = false; + private volatile boolean running = false; /* an event factory */ private EventFactory factory = new EventFactory(); @@ -40,6 +41,7 @@ public void initialize() throws IOException { @Override public synchronized void shutdown() { + super.shutdown(); running = false; } @@ -50,11 +52,12 @@ public void run() { while (running) { try { QueueElement element = null; - element = queue.take(); - if (log.isTraceEnabled()) { - log.trace("Removed from queue: " + element); - } - handleElement((DatagramQueueElement) element); + if ((element = queue.poll(1,TimeUnit.SECONDS)) != null) { + if (log.isTraceEnabled()) { + log.trace("Removed from queue: " + element); + } + handleElement((DatagramQueueElement) element); + } } catch (UnsupportedOperationException uoe) { // not a problem, someone grabbed the event before we did diff --git a/src/main/java/org/lwes/listener/DatagramEnqueuer.java b/src/main/java/org/lwes/listener/DatagramEnqueuer.java index ee97325..8bbc53b 100644 --- a/src/main/java/org/lwes/listener/DatagramEnqueuer.java +++ b/src/main/java/org/lwes/listener/DatagramEnqueuer.java @@ -17,6 +17,7 @@ import java.net.DatagramSocket; import java.net.InetAddress; import java.net.MulticastSocket; +import java.net.SocketTimeoutException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -50,7 +51,7 @@ public class DatagramEnqueuer extends ThreadedEnqueuer { protected byte[] buffer = null; /* thread control */ - protected boolean running = false; + protected volatile boolean running = false; public DatagramEnqueuer() { super(); @@ -163,6 +164,7 @@ public void initialize() throws IOException { bufSize = Integer.parseInt(bufSizeStr); } socket.setReceiveBufferSize(bufSize); + socket.setSoTimeout(1000); } @Override @@ -181,7 +183,11 @@ public void run() { while (running) { try { DatagramPacket datagram = new DatagramPacket(buffer, buffer.length); - socket.receive(datagram); + try { + socket.receive(datagram); + } catch (SocketTimeoutException ste) { + continue; + } if (log.isTraceEnabled()) { log.trace("Received datagram: " + datagram); } diff --git a/src/main/java/org/lwes/listener/ThreadedDequeuer.java b/src/main/java/org/lwes/listener/ThreadedDequeuer.java index f71f775..3220810 100644 --- a/src/main/java/org/lwes/listener/ThreadedDequeuer.java +++ b/src/main/java/org/lwes/listener/ThreadedDequeuer.java @@ -174,6 +174,11 @@ public void initialize() throws IOException { * ThreadedDequeuer. */ public void shutdown() { + synchronized (idleProcessors) { + for (ThreadedEventDispatcher d : idleProcessors) { + d.shutdown(); + } + } } /** diff --git a/src/main/java/org/lwes/listener/ThreadedEventDispatcher.java b/src/main/java/org/lwes/listener/ThreadedEventDispatcher.java index e4e4d12..c213219 100644 --- a/src/main/java/org/lwes/listener/ThreadedEventDispatcher.java +++ b/src/main/java/org/lwes/listener/ThreadedEventDispatcher.java @@ -29,6 +29,7 @@ public class ThreadedEventDispatcher extends Thread { private ThreadedDequeuer dequeuer; private EventHandler eventHandler; private Event event; + private volatile boolean stop = false; protected ThreadedEventDispatcher(ThreadedDequeuer aDequeuer) { this.dequeuer = aDequeuer; @@ -61,7 +62,7 @@ public final boolean isIdle() { @Override public void run() { - while(true) { + while(!stop) { synchronized(this) { if(isActive()) { try { @@ -72,7 +73,7 @@ public void run() { clearTask(); } else { try { - wait(); + wait(1000); } catch(InterruptedException e) {} } } @@ -88,4 +89,8 @@ private void clearTask() { dequeuer.makeAvailable(this); } + public void shutdown() { + stop = true; + } + } diff --git a/src/test/java/org/lwes/listener/TestListener.java b/src/test/java/org/lwes/listener/TestListener.java new file mode 100644 index 0000000..4aa8f4a --- /dev/null +++ b/src/test/java/org/lwes/listener/TestListener.java @@ -0,0 +1,33 @@ +import static java.lang.System.out; +import java.net.InetAddress; +import java.net.UnknownHostException; +import org.junit.Ignore; +import org.lwes.listener.DatagramEventListener; +import org.lwes.listener.EventHandler; + +/* + * Not a unit test. Demonstrates thread leaks in threaded listeners. + * Before reaching 100 will be unable to spawn new threads with default JVM settings. + * Exception in thread "main" java.lang.OutOfMemoryError: unable to create new native thread + */ + +@Ignore +public class TestListener { + + public static void main(String[] args) throws InterruptedException, UnknownHostException { + int count = 0; + while (true) { + DatagramEventListener listener; + listener = new DatagramEventListener(); + listener.setAddress(InetAddress.getByName("224.0.0.69")); + listener.setPort(9191); + listener.setQueueSize(50000); + listener.initialize(); + out.println(count); + Thread.currentThread().sleep(1000); + listener.shutdown(); + count++; + } + } + +}