Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions java/net/anotheria/idbasedlock/IdReentrantLock.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package net.anotheria.idbasedlock;

import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;

/**
* @author
*/
public class IdReentrantLock<T> extends ReentrantLock {

private T id;
private AtomicInteger refs = new AtomicInteger(0);

IdReentrantLock(T id) {
this.id = id;
}

public T getId() {
return id;
}

public int getRefs() {
return refs.get();
}

@Override
public void lock() {
refs.incrementAndGet();
super.lock();
}

@Override
public void unlock() {
refs.decrementAndGet();
super.unlock();
}

@Override
protected Thread getOwner() {
return super.getOwner();
}
}
73 changes: 73 additions & 0 deletions java/net/anotheria/idbasedlock/ParallelLockManager.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package net.anotheria.idbasedlock;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.StampedLock;

public class ParallelLockManager<T> {

private Map<T, IdReentrantLock<T>> locks = new ConcurrentHashMap<>();

// private ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
// private final Lock readLock = rwl.readLock();
// private final Lock writeLock = rwl.writeLock();
// private StampedLock stampedLock = new StampedLock();

private ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();

public ParallelLockManager() {
initLockCleaner();
}

private void initLockCleaner() {
long initDelay = TimeUnit.SECONDS.toMillis(1);
long period = TimeUnit.SECONDS.toMillis(1);
scheduler.scheduleAtFixedRate(new RunnableJob(), initDelay, period, TimeUnit.MILLISECONDS);
}

private class RunnableJob implements Runnable {
@Override
public void run() {
if (locks == null || locks.size() == 0) {
return;
}

// long stamp = stampedLock.writeLock();
try {
locks.entrySet().removeIf(e -> e.getValue().getRefs() <= 0);
} catch (Exception e) {
e.printStackTrace();
} finally {
// stampedLock.unlockWrite(stamp);
//writeLock.unlock();
}
}
}

public IdReentrantLock<T> lock(T id) {
if (id == null) {
throw new IllegalArgumentException("getLock() failed: id can't be null");
}

// long stamp = stampedLock.readLock();
// readLock.lock();
try {
IdReentrantLock<T> lock = locks.computeIfAbsent(id, v -> new IdReentrantLock<>(id));

lock.lock();

return lock;
} finally {
// stampedLock.unlockRead(stamp);
//// readLock.unlock();
}

}

int getLockSize() {
return locks.size();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ int getValue(){

class Worker {

protected int iterations = 1000000;//100000;
protected int iterations = 10000000;//100000;
protected Random rnd = new Random(System.nanoTime());
protected long addedValue = 0;

Expand Down Expand Up @@ -150,6 +150,7 @@ public void run(){
}

@Test public void testSafelySynched(){
long now = System.currentTimeMillis();
CountDownLatch ready = new CountDownLatch(WORKERS);
CountDownLatch start = new CountDownLatch(1);
CountDownLatch finish = new CountDownLatch(WORKERS);
Expand Down Expand Up @@ -184,6 +185,7 @@ public void run(){
System.out.println("Safely synched: Workers "+workersAdded+", Counters: "+countersCounted+" -> "+(workersAdded-countersCounted)+" in "+duration+" ms , ErrorRate: "+((double)(workersAdded-countersCounted)/workersAdded));
assertEquals(workersAdded,countersCounted);
assertEquals(0, ((AbstractIdBasedLockManager)lockManager).getLockSize());
System.out.println(System.currentTimeMillis()-now);
}

@Test public void testUnsafelySynched(){
Expand Down
Loading