点击(此处)折叠或打开
-
package com.test.zookeeper;
-
-
import java.io.IOException;
-
import java.util.concurrent.CountDownLatch;
-
-
import org.apache.zookeeper.WatchedEvent;
-
import org.apache.zookeeper.Watcher;
-
import org.apache.zookeeper.ZooKeeper;
-
import org.apache.zookeeper.Watcher.Event.KeeperState;
-
-
class ConnectionWatcher implements Watcher {
-
-
private static final int SESSION_TIMEOUT = 5000;
-
-
protected ZooKeeper zk;
-
private CountDownLatch connectedSignal = new CountDownLatch(1);
-
-
public void connect(String hosts) throws IOException, InterruptedException {
-
zk = new ZooKeeper(hosts, SESSION_TIMEOUT, this);
-
connectedSignal.await();
-
}
-
-
@Override
-
public void process(WatchedEvent event) {
-
if (event.getState() == KeeperState.SyncConnected) {
-
connectedSignal.countDown();
-
}
-
}
-
-
public void close() throws InterruptedException {
-
zk.close();
-
}
- }
1.创建组节点
创建持久节点代表一个组.
点击(此处)折叠或打开
-
package com.test.zookeeper;
-
-
import org.apache.zookeeper.CreateMode;
-
import org.apache.zookeeper.KeeperException;
-
import org.apache.zookeeper.ZooDefs.Ids;
-
-
public class CreateGroup extends ConnectionWatcher {
-
-
public void create(String groupName) throws KeeperException,InterruptedException {
-
String path = "/" + groupName;
-
//创建一个空数据的持久节点 访问权限为public
-
String createdPath = zk.create(path, null/*data*/, Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
-
System.out.println("Created " + createdPath);
-
}
-
-
public static void main(String[] args) throws Exception {
-
CreateGroup createGroup = new CreateGroup();
-
createGroup.connect("192.168.140.128:2181");
-
createGroup.create("DemoGroup");
-
createGroup.close();
-
}
- }
创建临时节点,会话中断则节点被删除.
点击(此处)折叠或打开
-
package com.test.zookeeper;
-
-
-
import org.apache.zookeeper.CreateMode;
-
import org.apache.zookeeper.KeeperException;
-
import org.apache.zookeeper.ZooDefs.Ids;
-
-
public class JoinGroup extends ConnectionWatcher {
-
public void join(String groupName, String memberName) throws KeeperException,InterruptedException {
-
String path = "/" + groupName + "/" + memberName;
-
//为组成员创建临时节点
-
//data中可以存储当前节点服务器的配置信息 例如主机名等
-
String createdPath = zk.create(path, null/*data*/, Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL);
-
System.out.println("Created " + createdPath);
-
}
-
public static void main(String[] args) throws Exception {
-
JoinGroup joinGroup = new JoinGroup();
-
joinGroup.connect("192.168.140.128:2181");
-
joinGroup.join("DemoGroup", "node1");
-
-
//通过sleep模拟组成员正在执行的任务
-
Thread.sleep(Long.MAX_VALUE);
-
}
- }
点击(此处)折叠或打开
-
package com.test.zookeeper;
-
-
import java.util.List;
-
-
import org.apache.zookeeper.WatchedEvent;
-
import org.apache.zookeeper.Watcher;
-
import org.apache.zookeeper.Watcher.Event.EventType;
-
import org.apache.zookeeper.ZooKeeper;
-
-
public class ListGroup extends ConnectionWatcher {
-
public void list(String groupName) throws Exception{
-
String path = "/" + groupName;
-
List<String> children = zk.getChildren(path,new ChildWatcher(zk,path));
-
if (children.isEmpty()) {
-
System.out.printf("No members in group %s\n", groupName);
-
System.exit(1);
-
}
-
for (String child : children) {
-
System.out.println(child);
-
}
-
}
-
public static void main(String[] args) throws Exception {
-
ListGroup listGroup = new ListGroup();
-
listGroup.connect("192.168.140.128:2181");
-
listGroup.list("DemoGroup");
-
//观察者会在子节点发生变化之后打印节点列表
-
Thread.sleep(Long.MAX_VALUE);
-
//listGroup.close();
-
}
-
}
-
/**
-
* 子节点观察者
-
*/
-
class ChildWatcher implements Watcher{
-
protected ZooKeeper zk = null;
-
private String path = null;
-
public ChildWatcher(ZooKeeper zk,String path) {
-
this.zk = zk;
-
this.path=path;
-
}
-
@Override
-
public void process(WatchedEvent event) {
-
try {
-
if(event.getType() == EventType.NodeChildrenChanged){
-
System.out.println("子节点反生了变化");
-
//递归调用
-
List<String> cs = zk.getChildren(path, new ChildWatcher(zk,path));
-
for(String c : cs){
-
System.out.println(c);
-
}
-
}
-
} catch (Exception e) {
-
e.printStackTrace();
-
}
-
}
-
- }
点击(此处)折叠或打开
-
package com.test.zookeeper;
-
-
import java.util.List;
-
-
public class DeleteGroup extends ConnectionWatcher {
-
//删除组下所有成员
-
public void delete(String groupName) throws Exception {
-
String path = "/" + groupName;
-
List<String> children = zk.getChildren(path, false);
-
for (String child : children) {
-
//1.提供节点路径和版本号,如果版本号和当前节点版本号一致则删除(乐观锁机制)
-
//2.绕过版本检测机制
-
zk.delete(path + "/" + child, -1);
-
}
-
zk.delete(path, -1);
-
}
-
public static void main(String[] args) throws Exception {
-
DeleteGroup deleteGroup = new DeleteGroup();
-
deleteGroup.connect("192.168.140.128:2181");
-
deleteGroup.delete("DemoGroup");
-
deleteGroup.close();
-
}
- }