diff --git a/java/net/anotheria/idbasedlock/IdReentrantLock.java b/java/net/anotheria/idbasedlock/IdReentrantLock.java new file mode 100644 index 0000000..a99631a --- /dev/null +++ b/java/net/anotheria/idbasedlock/IdReentrantLock.java @@ -0,0 +1,42 @@ +package net.anotheria.idbasedlock; + +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantLock; + +/** + * @author + */ +public class IdReentrantLock extends ReentrantLock { + + private T id; + private AtomicInteger refs = new AtomicInteger(0); + + IdReentrantLock(T id) { + this.id = id; + } + + public T getId() { + return id; + } + + public int getRefs() { + return refs.get(); + } + + @Override + public void lock() { + refs.incrementAndGet(); + super.lock(); + } + + @Override + public void unlock() { + refs.decrementAndGet(); + super.unlock(); + } + + @Override + protected Thread getOwner() { + return super.getOwner(); + } +} diff --git a/java/net/anotheria/idbasedlock/ParallelLockManager.java b/java/net/anotheria/idbasedlock/ParallelLockManager.java new file mode 100644 index 0000000..63e52f8 --- /dev/null +++ b/java/net/anotheria/idbasedlock/ParallelLockManager.java @@ -0,0 +1,73 @@ +package net.anotheria.idbasedlock; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.StampedLock; + +public class ParallelLockManager { + + private Map> locks = new ConcurrentHashMap<>(); + + // private ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); + // private final Lock readLock = rwl.readLock(); + // private final Lock writeLock = rwl.writeLock(); +// private StampedLock stampedLock = new StampedLock(); + + private ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); + + public ParallelLockManager() { + initLockCleaner(); + } + + private void initLockCleaner() { + long initDelay = TimeUnit.SECONDS.toMillis(1); + long period = TimeUnit.SECONDS.toMillis(1); + scheduler.scheduleAtFixedRate(new RunnableJob(), initDelay, period, TimeUnit.MILLISECONDS); + } + + private class RunnableJob implements Runnable { + @Override + public void run() { + if (locks == null || locks.size() == 0) { + return; + } + +// long stamp = stampedLock.writeLock(); + try { + locks.entrySet().removeIf(e -> e.getValue().getRefs() <= 0); + } catch (Exception e) { + e.printStackTrace(); + } finally { +// stampedLock.unlockWrite(stamp); + //writeLock.unlock(); + } + } + } + + public IdReentrantLock lock(T id) { + if (id == null) { + throw new IllegalArgumentException("getLock() failed: id can't be null"); + } + +// long stamp = stampedLock.readLock(); + // readLock.lock(); + try { + IdReentrantLock lock = locks.computeIfAbsent(id, v -> new IdReentrantLock<>(id)); + + lock.lock(); + + return lock; + } finally { +// stampedLock.unlockRead(stamp); + //// readLock.unlock(); + } + + } + + int getLockSize() { + return locks.size(); + } +} \ No newline at end of file diff --git a/test/junit/net/anotheria/idbasedlock/IdBasedLockManagerTest.java b/test/junit/net/anotheria/idbasedlock/IdBasedLockManagerTest.java index c07b2d7..a709ab5 100644 --- a/test/junit/net/anotheria/idbasedlock/IdBasedLockManagerTest.java +++ b/test/junit/net/anotheria/idbasedlock/IdBasedLockManagerTest.java @@ -32,7 +32,7 @@ int getValue(){ class Worker { - protected int iterations = 1000000;//100000; + protected int iterations = 10000000;//100000; protected Random rnd = new Random(System.nanoTime()); protected long addedValue = 0; @@ -150,6 +150,7 @@ public void run(){ } @Test public void testSafelySynched(){ + long now = System.currentTimeMillis(); CountDownLatch ready = new CountDownLatch(WORKERS); CountDownLatch start = new CountDownLatch(1); CountDownLatch finish = new CountDownLatch(WORKERS); @@ -184,6 +185,7 @@ public void run(){ System.out.println("Safely synched: Workers "+workersAdded+", Counters: "+countersCounted+" -> "+(workersAdded-countersCounted)+" in "+duration+" ms , ErrorRate: "+((double)(workersAdded-countersCounted)/workersAdded)); assertEquals(workersAdded,countersCounted); assertEquals(0, ((AbstractIdBasedLockManager)lockManager).getLockSize()); + System.out.println(System.currentTimeMillis()-now); } @Test public void testUnsafelySynched(){ diff --git a/test/junit/net/anotheria/idbasedlock/ParallelLockManagerTest.java b/test/junit/net/anotheria/idbasedlock/ParallelLockManagerTest.java new file mode 100644 index 0000000..748104f --- /dev/null +++ b/test/junit/net/anotheria/idbasedlock/ParallelLockManagerTest.java @@ -0,0 +1,261 @@ +package net.anotheria.idbasedlock; + +import java.util.HashMap; +import java.util.Random; +import java.util.concurrent.CountDownLatch; + +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class ParallelLockManagerTest { + + private static final int COUNTERS = 5; + private static final int WORKERS = 5; + + //private SafeIdBasedLockManager lockManager = new SafeIdBasedLockManager(); + + static class Counter { + private int value = 0; + + void increaseBy(int aValue) { + value += aValue; + } + + int getValue() { + return value; + } + } + + class Worker { + + protected int iterations = 10000000;//100000; + protected Random rnd = new Random(System.nanoTime()); + protected long addedValue = 0; + + protected CountDownLatch ready; + protected CountDownLatch start; + protected CountDownLatch finish; + protected ParallelLockManager lockManager; + + public Worker(CountDownLatch ready, CountDownLatch start, CountDownLatch finish) { + this(new ParallelLockManager(), ready, start, finish); + } + + public Worker(ParallelLockManager aLockManager, CountDownLatch ready, CountDownLatch start, CountDownLatch finish) { + this.ready = ready; + this.start = start; + this.finish = finish; + lockManager = aLockManager; + } + + public long getAddedValue() { + return addedValue; + } + } + + class UnsynchedWorker extends Worker implements Runnable { + + public UnsynchedWorker(CountDownLatch ready, CountDownLatch start, CountDownLatch finish) { + super(ready, start, finish); + } + + public void run() { + ready.countDown(); + try { + start.await(); + } catch (InterruptedException e) { + // + } + for (int i = 0; i < iterations; i++) { + int counterId = rnd.nextInt(COUNTERS); + int valueToAdd = 1;//rnd.nextInt(100); + counters.get("" + counterId).increaseBy(valueToAdd); + addedValue += valueToAdd; + } + finish.countDown(); + } + } + + class SynchedWorker extends Worker implements Runnable { + + public SynchedWorker(ParallelLockManager lockManager, CountDownLatch ready, CountDownLatch start, CountDownLatch finish) { + // public SynchedWorker(LockManager lockManager, CountDownLatch ready, CountDownLatch start, CountDownLatch finish) { + super(lockManager, ready, start, finish); + } + + public void run() { + ready.countDown(); + try { + start.await(); + } catch (InterruptedException e) { + // + } + for (int i = 0; i < iterations; i++) { + int counterId = rnd.nextInt(COUNTERS); + int valueToAdd = 1;//rnd.nextInt(100); + IdReentrantLock lock = lockManager.lock("" + counterId); + //SafeIdBasedLockManager.out("worker id "+counterId+" lockcount: "+lock.getReferenceCount()); + // lock.lock(); + counters.get("" + counterId).increaseBy(valueToAdd); + lock.unlock(); + // lockManager.unlock("" + counterId); + addedValue += valueToAdd; + } + finish.countDown(); + } + } + + private HashMap counters = null; + + @Before + public void init() { + counters = new HashMap(); + for (int i = 0; i < COUNTERS; i++) { + counters.put("" + i, new Counter()); + } + } + + private static long unsynchedErrors; + + @Test + public void testUnsynched() { + CountDownLatch ready = new CountDownLatch(WORKERS); + CountDownLatch start = new CountDownLatch(1); + CountDownLatch finish = new CountDownLatch(WORKERS); + + UnsynchedWorker[] workers = new UnsynchedWorker[WORKERS]; + for (int i = 0; i < WORKERS; i++) { + workers[i] = new UnsynchedWorker(ready, start, finish); + new Thread(workers[i]).start(); + } + + try { + ready.await(); + } catch (InterruptedException e) { + } + + long startTime = System.currentTimeMillis(); + start.countDown(); + try { + finish.await(); + } catch (InterruptedException e) { + } + long duration = System.currentTimeMillis() - startTime; + + long workersAdded = 0, countersCounted = 0; + for (UnsynchedWorker worker : workers) { + workersAdded += worker.getAddedValue(); + } + + for (Counter c : counters.values()) { + countersCounted += c.getValue(); + } + + unsynchedErrors = workersAdded - countersCounted; + System.out.println("Unsynched: Workers " + workersAdded + ", Counters: " + countersCounted + " -> " + (unsynchedErrors) + " in " + duration + " ms, ErrorRate: " + + ((double) (workersAdded - countersCounted) / workersAdded)); + assertTrue(workersAdded > countersCounted); + } + + @Test + public void testSafelySynched() { + long now = System.currentTimeMillis(); + CountDownLatch ready = new CountDownLatch(WORKERS); + CountDownLatch start = new CountDownLatch(1); + CountDownLatch finish = new CountDownLatch(WORKERS); + ParallelLockManager lockManager = new ParallelLockManager(); + + SynchedWorker[] workers = new SynchedWorker[WORKERS]; + for (int i = 0; i < WORKERS; i++) { + workers[i] = new SynchedWorker(lockManager, ready, start, finish); + new Thread(workers[i]).start(); + } + + try { + ready.await(); + } catch (InterruptedException e) { + } + long startTime = System.currentTimeMillis(); + start.countDown(); + + try { + finish.await(); + } catch (InterruptedException e) { + } + long duration = System.currentTimeMillis() - startTime; + + long workersAdded = 0, countersCounted = 0; + for (SynchedWorker worker : workers) { + workersAdded += worker.getAddedValue(); + } + + for (Counter c : counters.values()) { + countersCounted += c.getValue(); + } + System.out.println(System.currentTimeMillis() - now); + System.out.println("Safely synched: Workers " + workersAdded + ", Counters: " + countersCounted + " -> " + (workersAdded - countersCounted) + " in " + duration + " ms , ErrorRate: " + + ((double) (workersAdded - countersCounted) / workersAdded)); + assertEquals(workersAdded, countersCounted); + try { + Thread.sleep(2000);//wait for cleaner + } catch (InterruptedException e) { + e.printStackTrace(); + } + assertEquals(0, lockManager.getLockSize()); + } + + @Test + public void testUnsafelySynched() { + CountDownLatch ready = new CountDownLatch(WORKERS); + CountDownLatch start = new CountDownLatch(1); + CountDownLatch finish = new CountDownLatch(WORKERS); + ParallelLockManager lockManager = new ParallelLockManager<>(); + + SynchedWorker[] workers = new SynchedWorker[WORKERS]; + for (int i = 0; i < WORKERS; i++) { + workers[i] = new SynchedWorker(lockManager, ready, start, finish); + new Thread(workers[i]).start(); + } + + try { + ready.await(); + } catch (InterruptedException e) { + } + long startTime = System.currentTimeMillis(); + start.countDown(); + + try { + finish.await(); + } catch (InterruptedException e) { + } + long duration = System.currentTimeMillis() - startTime; + + long workersAdded = 0, countersCounted = 0; + for (SynchedWorker worker : workers) { + workersAdded += worker.getAddedValue(); + } + + for (Counter c : counters.values()) { + countersCounted += c.getValue(); + } + + System.out.println("Unsafely synched: Workers " + workersAdded + ", Counters: " + countersCounted + " -> " + (workersAdded - countersCounted) + " in " + duration + " ms , ErrorRate: " + + ((double) (workersAdded - countersCounted) / workersAdded)); + + assertEquals(workersAdded, countersCounted); + + try { + Thread.sleep(2000);//wait for cleaner + } catch (InterruptedException e) { + e.printStackTrace(); + } + + assertEquals(0, lockManager.getLockSize()); + //System.out.println(unsynchedErrors+" "+unsynchedErrors/100); +// assertTrue("expected " + unsynchedErrors / 100 + " errors, got " + (workersAdded - countersCounted), (workersAdded - countersCounted) < (unsynchedErrors / 100)); + } + +}