-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathZKDistrubutedLock.java
More file actions
144 lines (129 loc) · 4.26 KB
/
ZKDistrubutedLock.java
File metadata and controls
144 lines (129 loc) · 4.26 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
package simpleDistLockTest;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
/**
* 一个基于zookeeper的分布式锁
* @author Xiupitter
*
*/
public class ZKDistrubutedLock implements Watcher{
private ZooKeeper zk;
private String keyLocked;
private String node;
private Watcher defaultWatcher;
private CountDownLatch connectLock= new CountDownLatch(1);
private CountDownLatch lock= new CountDownLatch(1);
/**
* the key to lock
* @param key
*/
public ZKDistrubutedLock(String key) {
// TODO Auto-generated constructor stub
this.keyLocked ="/" +key;
defaultWatcher = new LockWatcher();
try {
zk = new ZooKeeper("127.0.0.1:2181", 5*1000, this);
connectLock.await();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public synchronized void lockInterruptibly() throws KeeperException, InterruptedException {
node = zk.create(keyLocked+"/lock", (" ").getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
SortedSet<String> set = new TreeSet<String>(zk.getChildren(keyLocked, defaultWatcher));
if(node.equals(keyLocked+"/"+set.first())){//success
}else{//fail
lock.await();
}
return;
}
public synchronized boolean tryLock() throws KeeperException, InterruptedException {
// TODO Auto-generated method stub
node = zk.create(keyLocked+"/lock", (" ").getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
SortedSet<String> set = new TreeSet<String>(zk.getChildren(keyLocked, false));
if(node.equals(keyLocked+"/"+set.first())){//success
zk.delete(node, -1);
return true;
}else{
zk.delete(node, -1);
return false;
}
}
public synchronized boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException, KeeperException {
// TODO Auto-generated method stub
node = zk.create(keyLocked+"/lock", (" ").getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
SortedSet<String> set = new TreeSet<String>(zk.getChildren(keyLocked, defaultWatcher));
if(node.equals(keyLocked+"/"+set.first())){//success
return true;
}else{//fail
lock.await(timeout, unit);
}
return false;
}
public synchronized void unlock() throws KeeperException, InterruptedException {
// TODO Auto-generated method stub
//zk.delete(node, -1);
zk.close();
}
@Override
public void process(WatchedEvent event) {
// TODO Auto-generated method stub
if(event.getState().equals(Event.KeeperState.SyncConnected)){
try {
if(zk!=null){
if(zk.exists(keyLocked, false)==null){
zk.create(keyLocked, "a lock under the root".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
}
connectLock.countDown();
} catch (KeeperException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
private class LockWatcher implements Watcher{
@Override
public void process(WatchedEvent event) {
// TODO Auto-generated method stub
if(event.getState().equals(Event.KeeperState.SyncConnected)){
if(event.getPath().contains(keyLocked)&&event.getType().equals(Event.EventType.NodeChildrenChanged)){
try {
SortedSet<String> set = new TreeSet<String>(zk.getChildren(keyLocked, defaultWatcher));
System.out.println(keyLocked+"/"+set.first()+" "+node);
if(node.equals(keyLocked+"/"+set.first())){
lock.countDown();
}
} catch (KeeperException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
}
}