主线程负责监听端口、accept连接、read操作.
启动另外一个线程负责消息的write操作.
采用包长+包体组装协议.
LinkedBlockingQueue 存储待广播的消息内容.
服务端代码:
点击(此处)折叠或打开
-
import java.io.IOException;
-
import java.net.InetSocketAddress;
-
import java.nio.ByteBuffer;
-
import java.nio.channels.SelectionKey;
-
import java.nio.channels.Selector;
-
import java.nio.channels.ServerSocketChannel;
-
import java.nio.channels.SocketChannel;
-
import java.util.Iterator;
-
import java.util.LinkedList;
-
import java.util.List;
-
import java.util.Set;
-
import java.util.concurrent.BlockingQueue;
-
import java.util.concurrent.LinkedBlockingQueue;
-
-
-
public class ServerSocketDemo2 {
-
-
private static BlockingQueue<String> bq = new LinkedBlockingQueue<>();
-
-
private static List<SocketChannel> listSc = new LinkedList<SocketChannel>();
-
-
public static void main(String[] args) throws Exception {
-
-
//1.该线程负责 写socket
-
//2.主线程维连接和读socket
-
new Thread(new Runnable() {
-
@Override
-
public void run() {
-
System.out.println("该线程负责写入socket");
-
//该线程负责写操作
-
String msg = null;
-
while (true ) {
-
try {
-
//此处会阻塞
-
msg = bq.take();
-
} catch (InterruptedException e1) {
-
e1.printStackTrace();
-
}
-
System.out.println(System.currentTimeMillis());
-
System.out.println("接收到数据"+msg);
-
-
Iterator<SocketChannel> it = listSc.iterator();
-
while(it.hasNext()) {
-
SocketChannel sc = it.next();
-
if(!sc.isConnected()) {
-
continue;
-
}
-
-
//先写入包长度
-
ByteBuffer headwb = ByteBuffer.allocate(4);
-
headwb.putInt(msg.getBytes().length);
-
//重绕缓冲区 否则下面无法进行write操作
-
headwb.flip();
-
while(headwb.hasRemaining()) {
-
try {
-
sc.write(headwb);
-
} catch (IOException e) {
-
e.printStackTrace();
-
}
-
}
-
//再写入包内容
-
ByteBuffer bodywb = ByteBuffer.wrap(msg.getBytes());
-
while(bodywb.hasRemaining()) {
-
try {
-
sc.write(bodywb);
-
System.out.println("发送完成");
-
} catch (IOException e) {
-
e.printStackTrace();
-
}
-
}
-
}
-
}
-
}
-
}).start();
-
//1. 创建一个选择器
-
Selector s = Selector.open();
-
//2.创建一个服务端channel
-
ServerSocketChannel ssc = ServerSocketChannel.open();
-
//3.置为非阻塞IO
-
ssc.configureBlocking(false);
-
ssc.bind(new InetSocketAddress("127.0.0.1", 44446));
-
-
//4.将当前channel 的accept read write操作注册到 选择器中
-
ssc.register(s , SelectionKey.OP_ACCEPT);
-
//5循环接收事件
-
while(true) {
-
s.select();//该方法为阻塞方法,如果没有事件唤醒就不会执行下边的代码
-
//遍历已经唤醒的事件列表
-
Set<SelectionKey> list = s.selectedKeys();
-
Iterator<SelectionKey> it = list.iterator();
-
while (it.hasNext()) {
-
SelectionKey sk = it.next();
-
if( sk.isAcceptable() == true ) {
-
//获取channel 强制转换
-
ServerSocketChannel wsc = (ServerSocketChannel)sk.channel();
-
//设置为非阻塞模式 所以accepc操作不一定有返回值 需循环操作只到有返回值为止
-
SocketChannel sc = null;
-
while ( sc == null ) {
-
sc = wsc.accept();
-
}
-
//将该连接socket置为非阻塞状态
-
sc.configureBlocking(false);
-
//当前socket同时注册 读和写监控
-
sc.register(s,SelectionKey.OP_READ );
-
-
-
listSc.add(sc);
-
-
} else if(sk.isReadable() == true) {
-
SocketChannel sc = (SocketChannel)sk.channel();
-
//从socket读取数据写入缓冲区 先读整型的包长
-
ByteBuffer ib = ByteBuffer.allocate(4);
-
while(ib.hasRemaining()) {
-
sc.read(ib);
-
}
-
//重绕缓冲区 否则 下面的getInt不能获取准确的长度
-
ib.flip();
-
int len = ib.getInt();
-
ByteBuffer contentBuffer = ByteBuffer.allocate(len);
-
while(contentBuffer.hasRemaining()) {
-
sc.read(contentBuffer);
-
}
-
-
//写入队列
-
bq.put(new String(contentBuffer.array()));
-
-
} else {
-
throw new Exception("未知操作类型!");
-
}
-
it.remove();
-
}
-
}
-
}
-
- }
客户端代码:
点击(此处)折叠或打开
-
import java.net.InetSocketAddress;
-
import java.nio.ByteBuffer;
-
import java.nio.channels.SelectionKey;
-
import java.nio.channels.Selector;
-
import java.nio.channels.SocketChannel;
-
import java.util.Iterator;
-
import java.util.Set;
-
-
public class ClientSocketDemo {
-
-
public static void main(String[] args) throws Exception {
-
//1. 创建一个选择器
-
Selector s = Selector.open();
-
//2 创建channel
-
SocketChannel sc = SocketChannel.open();
-
sc.configureBlocking(false);
-
sc.connect(new InetSocketAddress("127.0.0.1",44446));
-
sc.register(s, SelectionKey.OP_CONNECT);
-
while( true ) {
-
s.select();
-
//遍历已经唤醒的事件列表
-
Set<SelectionKey> list = s.selectedKeys();
-
Iterator<SelectionKey> it = list.iterator();
-
while (it.hasNext()) {
-
SelectionKey sk = it.next();
-
if(sk.isReadable() == true) {
-
//获取channel
-
SocketChannel readsc = (SocketChannel)sk.channel();
-
//从socket读取数据写入缓冲区 先读整型的包长
-
ByteBuffer ib = ByteBuffer.allocate(4);
-
while(ib.hasRemaining()) {
-
readsc.read(ib);
-
}
-
//重绕缓冲区 否则 下面的getInt不能获取准确的长度
-
ib.flip();
-
int len = ib.getInt();
-
ByteBuffer contentBuffer = ByteBuffer.allocate(len);
-
while(contentBuffer.hasRemaining()) {
-
readsc.read(contentBuffer);
-
}
-
System.out.println(new String(contentBuffer.array()));
-
} else if(sk.isWritable()) {
-
//获取channel
-
SocketChannel writesc = (SocketChannel)sk.channel();
-
-
//返回字符串
-
String rstr = "hellow world";
-
//先写入包长度
-
ByteBuffer headwb = ByteBuffer.allocate(4);
-
headwb.putInt(rstr.getBytes().length);
-
//重绕缓冲区 否则下面无法进行write操作
-
headwb.flip();
-
while(headwb.hasRemaining()) {
-
writesc.write(headwb);
-
}
-
//再写入包内容
-
ByteBuffer bodywb = ByteBuffer.wrap(rstr.getBytes());
-
while(bodywb.hasRemaining()) {
-
writesc.write(bodywb);
-
}
-
//注册当前通道上的读事件 并取消掉写事件
-
writesc.register(s,sk.interestOps() & ~ SelectionKey.OP_WRITE );
-
} else if(sk.isConnectable()) {
-
//获取channel
-
SocketChannel sc1 = (SocketChannel)sk.channel();
-
while(!sc1.isConnected()) {
-
sc1.finishConnect();
-
}
-
sc1.register(s, SelectionKey.OP_WRITE | SelectionKey.OP_READ);
-
} else {
-
throw new Exception("未知操作类型!");
-
}
-
it.remove();
-
}
-
}
-
}
-
- }