diff --git a/bpopovic/pom.xml b/bpopovic/pom.xml new file mode 100644 index 0000000..d9e2e0e --- /dev/null +++ b/bpopovic/pom.xml @@ -0,0 +1,16 @@ + + + 4.0.0 + + + multithreading-root + com.htec + 1.0-SNAPSHOT + + + bpopovic + 1.0-SNAPSHOT + jar + \ No newline at end of file diff --git a/bpopovic/src/main/java/com/htec/tasks/first/AbstractTaskExecutor.java b/bpopovic/src/main/java/com/htec/tasks/first/AbstractTaskExecutor.java new file mode 100644 index 0000000..d77eda1 --- /dev/null +++ b/bpopovic/src/main/java/com/htec/tasks/first/AbstractTaskExecutor.java @@ -0,0 +1,30 @@ +package com.htec.tasks.first; + +import java.util.function.BiConsumer; +import java.util.function.Consumer; + +public abstract class AbstractTaskExecutor extends Thread { + Consumer successfulCallback; + BiConsumer failCallback; + T task; + + protected AbstractTaskExecutor( + Consumer successfulCallback, BiConsumer failCallback) { + this.successfulCallback = successfulCallback; + this.failCallback = failCallback; + } + + public void addTask(T task) { + this.task = task; + } + + @Override + public void run() { + try { + task.run(); + successfulCallback.accept(task); + } catch (Exception e) { + failCallback.accept(task, e); + } + } +} diff --git a/bpopovic/src/main/java/com/htec/tasks/first/AbstractTaskExecutorFactory.java b/bpopovic/src/main/java/com/htec/tasks/first/AbstractTaskExecutorFactory.java new file mode 100644 index 0000000..43595ca --- /dev/null +++ b/bpopovic/src/main/java/com/htec/tasks/first/AbstractTaskExecutorFactory.java @@ -0,0 +1,9 @@ +package com.htec.tasks.first; + +import java.util.function.BiConsumer; +import java.util.function.Consumer; + +public abstract class AbstractTaskExecutorFactory { + abstract AbstractTaskExecutor getAbstractTaskExecutor( + Consumer successfulCallback, BiConsumer failCallback); +} diff --git a/bpopovic/src/main/java/com/htec/tasks/first/BulkTaskRunner.java b/bpopovic/src/main/java/com/htec/tasks/first/BulkTaskRunner.java new file mode 100644 index 0000000..b5fd965 --- /dev/null +++ b/bpopovic/src/main/java/com/htec/tasks/first/BulkTaskRunner.java @@ -0,0 +1,70 @@ +package com.htec.tasks.first; + +import java.util.List; +import java.util.function.BiConsumer; +import java.util.function.Consumer; + +public class BulkTaskRunner implements AutoCloseable { + + private AbstractTaskExecutorFactory abstractTaskExecutorFactory; + private List availableThreads; + private List workingThreads; + + private Consumer successfulCallback; + private BiConsumer failCallback; + private int numberOfThreads; + private boolean isBlocked = false; + + public void start( + int numberOfThreads, Consumer successfulCallback, BiConsumer failCallback) { + this.successfulCallback = successfulCallback; + this.failCallback = failCallback; + this.numberOfThreads = numberOfThreads; + for (int i = 0; i < numberOfThreads; i++) { + availableThreads.add( + abstractTaskExecutorFactory.getAbstractTaskExecutor(successfulCallback, failCallback)); + } + } + + public synchronized void addTask(T task) { + while (true) { + if (availableThreads.size() + workingThreads.size() < numberOfThreads) { + availableThreads.add( + abstractTaskExecutorFactory.getAbstractTaskExecutor(successfulCallback, failCallback)); + continue; + } + if (availableThreads.size() > 0 && !isBlocked) { + AbstractTaskExecutor thread = availableThreads.remove(0); + thread.addTask(task); + thread.start(); + workingThreads.add(thread); + break; + } else { + try { + wait(1000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } + } + + public synchronized void waitTillFinished() { + isBlocked = true; + } + + public int getNumberOfRunningExecutors() { + return workingThreads.size(); + } + + @Override + public void close() throws Exception { + for (AbstractTaskExecutor thread : availableThreads) { + thread.interrupt(); + } + + for (AbstractTaskExecutor thread : workingThreads) { + thread.interrupt(); + } + } +} diff --git a/bpopovic/src/main/java/com/htec/tasks/second/AbstractTaskExecutor.java b/bpopovic/src/main/java/com/htec/tasks/second/AbstractTaskExecutor.java new file mode 100644 index 0000000..e2d22c7 --- /dev/null +++ b/bpopovic/src/main/java/com/htec/tasks/second/AbstractTaskExecutor.java @@ -0,0 +1,44 @@ +package com.htec.tasks.second; + +import java.util.function.BiConsumer; +import java.util.function.Consumer; + +public abstract class AbstractTaskExecutor extends Thread { + Consumer successfulCallback; + BiConsumer failCallback; + T task; + private static final int MAX_RETRY = 5; + private boolean success = false; + + protected AbstractTaskExecutor( + Consumer successfulCallback, BiConsumer failCallback) { + this.successfulCallback = successfulCallback; + this.failCallback = failCallback; + } + + public void addTask(T task) { + this.task = task; + } + + @Override + public void run() { + int numberOfRuns = 1; + + while (numberOfRuns <= MAX_RETRY && !success) { + try { + task.run(); + success = true; + successfulCallback.accept(task); + } catch (Exception e) { + numberOfRuns++; + if (numberOfRuns == MAX_RETRY) { + failCallback.accept(task, e); + } + } + } + } + + public boolean getSuccessStatus(){ + return success; + } +} diff --git a/bpopovic/src/main/java/com/htec/tasks/second/AbstractTaskExecutorFactory.java b/bpopovic/src/main/java/com/htec/tasks/second/AbstractTaskExecutorFactory.java new file mode 100644 index 0000000..92f4573 --- /dev/null +++ b/bpopovic/src/main/java/com/htec/tasks/second/AbstractTaskExecutorFactory.java @@ -0,0 +1,9 @@ +package com.htec.tasks.second; + +import java.util.function.BiConsumer; +import java.util.function.Consumer; + +public abstract class AbstractTaskExecutorFactory { + abstract AbstractTaskExecutor getAbstractTaskExecutor( + Consumer successfulCallback, BiConsumer failCallback); +} diff --git a/bpopovic/src/main/java/com/htec/tasks/second/BulkTaskRunner.java b/bpopovic/src/main/java/com/htec/tasks/second/BulkTaskRunner.java new file mode 100644 index 0000000..6c73af3 --- /dev/null +++ b/bpopovic/src/main/java/com/htec/tasks/second/BulkTaskRunner.java @@ -0,0 +1,71 @@ +package com.htec.tasks.second; + +import java.util.List; +import java.util.function.BiConsumer; +import java.util.function.Consumer; + +public class BulkTaskRunner implements AutoCloseable { + + private AbstractTaskExecutorFactory abstractTaskExecutorFactory; + private List availableThreads; + private List workingThreads; + + private Consumer successfulCallback; + private BiConsumer failCallback; + private int numberOfThreads; + private boolean isBlocked = true; + + public void start( + int numberOfThreads, Consumer successfulCallback, BiConsumer failCallback) { + this.successfulCallback = successfulCallback; + this.failCallback = failCallback; + this.numberOfThreads = numberOfThreads; + for (int i = 0; i < numberOfThreads; i++) { + availableThreads.add( + abstractTaskExecutorFactory.getAbstractTaskExecutor(successfulCallback, failCallback)); + } + isBlocked = false; + } + + public synchronized void addTask(T task) { + while (true) { + if (availableThreads.size() + workingThreads.size() < numberOfThreads) { + availableThreads.add( + abstractTaskExecutorFactory.getAbstractTaskExecutor(successfulCallback, failCallback)); + continue; + } + if (availableThreads.size() > 0 && !isBlocked) { + AbstractTaskExecutor thread = availableThreads.remove(0); + thread.addTask(task); + thread.start(); + workingThreads.add(thread); + break; + } else { + try { + wait(1000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } + } + + public synchronized void waitTillFinished() { + isBlocked = true; + } + + public synchronized int getNumberOfRunningExecutors() { + return workingThreads.size(); + } + + @Override + public void close() throws Exception { + for (AbstractTaskExecutor thread : availableThreads) { + thread.interrupt(); + } + + for (AbstractTaskExecutor thread : workingThreads) { + thread.interrupt(); + } + } +} diff --git a/pom.xml b/pom.xml index c312174..6fb0bb5 100644 --- a/pom.xml +++ b/pom.xml @@ -31,6 +31,7 @@ bulk-task-runner data-augmentation utils + bpopovic