连接服务器 ,设置default watch
点击(此处)折叠或打开
-
package com.test.zookeeper.barrier;
-
-
import java.io.IOException;
-
-
import org.apache.zookeeper.WatchedEvent;
-
import org.apache.zookeeper.Watcher;
-
import org.apache.zookeeper.ZooKeeper;
-
-
public class SyncPrimitive implements Watcher {
-
static ZooKeeper zk = null;
-
static Integer mutex;
-
static int i=0;
-
String root;
-
-
SyncPrimitive(String address) {
-
if(zk == null){
-
try {
-
System.out.println("Starting ZK:");
-
zk = new ZooKeeper(address, 3000, this);
-
mutex = new Integer(-1);
-
System.out.println("Finished starting ZK: " + zk);
-
} catch (IOException e) {
-
System.out.println(e.toString());
-
zk = null;
-
}
-
}
-
}
-
-
synchronized public void process(WatchedEvent event) {
-
synchronized (mutex) {
-
i++;
-
System.out.println(i);
-
mutex.notify();
-
}
-
}
- }
1.barrier 实现类
点击(此处)折叠或打开
-
package com.test.zookeeper.barrier;
-
-
import java.net.InetAddress;
-
import java.net.UnknownHostException;
-
import java.util.List;
-
-
import org.apache.zookeeper.CreateMode;
-
import org.apache.zookeeper.KeeperException;
-
import org.apache.zookeeper.ZooDefs.Ids;
-
import org.apache.zookeeper.data.Stat;
-
-
public class Barrier extends SyncPrimitive {
-
-
private int size=0;
-
private String root=null;
-
private String name;
-
-
Barrier(String address, String root, int size) {
-
super(address);
-
this.root = root;
-
this.size = size;
-
if (zk != null) {
-
try {
-
Stat s = zk.exists(root, false);
-
if (s == null) {
-
zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
-
}
-
} catch (KeeperException e) {
-
System.out.println("Keeper exception when instantiating queue: " + e.toString());
-
} catch (InterruptedException e) {
-
System.out.println("Interrupted exception");
-
}
-
}
-
-
try {
-
name = new String(InetAddress.getLocalHost().getCanonicalHostName().toString());
-
} catch (UnknownHostException e) {
-
System.out.println(e.toString());
-
}
-
-
}
-
/**
-
* Join barrier
-
*
-
* @return
-
* @throws KeeperException
-
* @throws InterruptedException
-
*/
-
-
boolean enter() throws KeeperException, InterruptedException{
-
zk.create(root + "/" + name, new byte[0], Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);
-
while (true) {
-
synchronized (mutex) {
-
//此处 第二个参数设置为true 调用父类中默认的watch
-
List<String> list = zk.getChildren(root, true);
-
-
if (list.size() < size) {
-
//如果进入栅栏的节点数小于设置的size值 则阻塞等待
-
mutex.wait();
-
} else {
-
return true;
-
}
-
}
-
}
-
}
-
-
/**
-
* Wait until all reach barrier
-
*
-
* @return
-
* @throws KeeperException
-
* @throws InterruptedException
-
*/
-
-
boolean leave() throws KeeperException, InterruptedException{
-
zk.delete(root + "/" + name, 0);
-
while (true) {
-
synchronized (mutex) {
-
List<String> list = zk.getChildren(root, true);
-
if (list.size() > 0) {
-
mutex.wait();
-
} else {
-
return true;
-
}
-
}
-
}
-
}
- }
2.测试调用代码
点击(此处)折叠或打开
-
package com.test.zookeeper.barrier;
-
-
import java.util.Random;
-
-
import org.apache.zookeeper.KeeperException;
-
-
public class BarrierDemo {
-
public static void main(String[] args) {
-
//设置栅栏通过数量为3
-
barrierTest(new String[]{"","192.168.140.128:2181","3"});
-
}
-
-
public static void barrierTest(String args[]) {
-
Barrier b = new Barrier(args[1], "/b1", new Integer(args[2]));
-
try{
-
boolean flag = b.enter();
-
System.out.println("Entered barrier: " + args[2]);
-
if(!flag) System.out.println("Error when entering the barrier");
-
} catch (KeeperException e){
-
-
} catch (InterruptedException e){
-
-
}
-
-
// Generate random integer
-
Random rand = new Random();
-
int r = rand.nextInt(100);
-
// Loop for rand iterations
-
for (int i = 0; i < r; i++) {
-
try {
-
Thread.sleep(100);
-
} catch (InterruptedException e) {
-
-
}
-
}
-
try{
-
b.leave();
-
} catch (KeeperException e){
-
-
} catch (InterruptedException e){
-
-
}
-
System.out.println("Left barrier");
-
}
- }