Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions bpopovic/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<artifactId>multithreading-root</artifactId>
<groupId>com.htec</groupId>
<version>1.0-SNAPSHOT</version>
</parent>

<artifactId>bpopovic</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package com.htec.tasks.first;

import java.util.function.BiConsumer;
import java.util.function.Consumer;

public abstract class AbstractTaskExecutor<T extends Runnable> extends Thread {
Consumer<T> successfulCallback;
BiConsumer<T, Exception> failCallback;
T task;

protected AbstractTaskExecutor(
Consumer<T> successfulCallback, BiConsumer<T, Exception> failCallback) {
this.successfulCallback = successfulCallback;
this.failCallback = failCallback;
}

public void addTask(T task) {
this.task = task;
}

@Override
public void run() {
try {
task.run();
successfulCallback.accept(task);
} catch (Exception e) {
failCallback.accept(task, e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package com.htec.tasks.first;

import java.util.function.BiConsumer;
import java.util.function.Consumer;

public abstract class AbstractTaskExecutorFactory<T> {
abstract AbstractTaskExecutor getAbstractTaskExecutor(
Consumer<T> successfulCallback, BiConsumer<T, Exception> failCallback);
}
70 changes: 70 additions & 0 deletions bpopovic/src/main/java/com/htec/tasks/first/BulkTaskRunner.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package com.htec.tasks.first;

import java.util.List;
import java.util.function.BiConsumer;
import java.util.function.Consumer;

public class BulkTaskRunner<T extends Runnable> implements AutoCloseable {

private AbstractTaskExecutorFactory abstractTaskExecutorFactory;
private List<AbstractTaskExecutor> availableThreads;
private List<AbstractTaskExecutor> workingThreads;

private Consumer<T> successfulCallback;
private BiConsumer<T, Exception> failCallback;
private int numberOfThreads;
private boolean isBlocked = false;

public void start(
int numberOfThreads, Consumer<T> successfulCallback, BiConsumer<T, Exception> failCallback) {
this.successfulCallback = successfulCallback;
this.failCallback = failCallback;
this.numberOfThreads = numberOfThreads;
for (int i = 0; i < numberOfThreads; i++) {
availableThreads.add(
abstractTaskExecutorFactory.getAbstractTaskExecutor(successfulCallback, failCallback));
}
}

public synchronized void addTask(T task) {
while (true) {
if (availableThreads.size() + workingThreads.size() < numberOfThreads) {
availableThreads.add(
abstractTaskExecutorFactory.getAbstractTaskExecutor(successfulCallback, failCallback));
continue;
}
if (availableThreads.size() > 0 && !isBlocked) {
AbstractTaskExecutor thread = availableThreads.remove(0);
thread.addTask(task);
thread.start();
workingThreads.add(thread);
break;
} else {
try {
wait(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}

public synchronized void waitTillFinished() {
isBlocked = true;
}

public int getNumberOfRunningExecutors() {
return workingThreads.size();
}

@Override
public void close() throws Exception {
for (AbstractTaskExecutor thread : availableThreads) {
thread.interrupt();
}

for (AbstractTaskExecutor thread : workingThreads) {
thread.interrupt();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package com.htec.tasks.second;

import java.util.function.BiConsumer;
import java.util.function.Consumer;

public abstract class AbstractTaskExecutor<T extends Runnable> extends Thread {
Consumer<T> successfulCallback;
BiConsumer<T, Exception> failCallback;
T task;
private static final int MAX_RETRY = 5;
private boolean success = false;

protected AbstractTaskExecutor(
Consumer<T> successfulCallback, BiConsumer<T, Exception> failCallback) {
this.successfulCallback = successfulCallback;
this.failCallback = failCallback;
}

public void addTask(T task) {
this.task = task;
}

@Override
public void run() {
int numberOfRuns = 1;

while (numberOfRuns <= MAX_RETRY && !success) {
try {
task.run();
success = true;
successfulCallback.accept(task);
} catch (Exception e) {
numberOfRuns++;
if (numberOfRuns == MAX_RETRY) {
failCallback.accept(task, e);
}
}
}
}

public boolean getSuccessStatus(){
return success;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package com.htec.tasks.second;

import java.util.function.BiConsumer;
import java.util.function.Consumer;

public abstract class AbstractTaskExecutorFactory<T> {
abstract AbstractTaskExecutor getAbstractTaskExecutor(
Consumer<T> successfulCallback, BiConsumer<T, Exception> failCallback);
}
71 changes: 71 additions & 0 deletions bpopovic/src/main/java/com/htec/tasks/second/BulkTaskRunner.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package com.htec.tasks.second;

import java.util.List;
import java.util.function.BiConsumer;
import java.util.function.Consumer;

public class BulkTaskRunner<T extends Runnable> implements AutoCloseable {

private AbstractTaskExecutorFactory abstractTaskExecutorFactory;
private List<AbstractTaskExecutor> availableThreads;
private List<AbstractTaskExecutor> workingThreads;

private Consumer<T> successfulCallback;
private BiConsumer<T, Exception> failCallback;
private int numberOfThreads;
private boolean isBlocked = true;

public void start(
int numberOfThreads, Consumer<T> successfulCallback, BiConsumer<T, Exception> failCallback) {
this.successfulCallback = successfulCallback;
this.failCallback = failCallback;
this.numberOfThreads = numberOfThreads;
for (int i = 0; i < numberOfThreads; i++) {
availableThreads.add(
abstractTaskExecutorFactory.getAbstractTaskExecutor(successfulCallback, failCallback));
}
isBlocked = false;
}

public synchronized void addTask(T task) {
while (true) {
if (availableThreads.size() + workingThreads.size() < numberOfThreads) {
availableThreads.add(
abstractTaskExecutorFactory.getAbstractTaskExecutor(successfulCallback, failCallback));
continue;
}
if (availableThreads.size() > 0 && !isBlocked) {
AbstractTaskExecutor thread = availableThreads.remove(0);
thread.addTask(task);
thread.start();
workingThreads.add(thread);
break;
} else {
try {
wait(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}

public synchronized void waitTillFinished() {
isBlocked = true;
}

public synchronized int getNumberOfRunningExecutors() {
return workingThreads.size();
}

@Override
public void close() throws Exception {
for (AbstractTaskExecutor thread : availableThreads) {
thread.interrupt();
}

for (AbstractTaskExecutor thread : workingThreads) {
thread.interrupt();
}
}
}
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
<module>bulk-task-runner</module>
<module>data-augmentation</module>
<module>utils</module>
<module>bpopovic</module>
</modules>

<dependencyManagement>
Expand Down