Java NIO网络编程
BIO
BIO:传统的网络通讯模型,同步阻塞IO。服务端创建一个ServerSocket, 客户端用Socket去连接服务端的ServerSocket, ServerSocket接收到了一个的连接请求就会创建一个Socket和一个线程去跟那个Socket进行通讯。接着客户端和服务端就进行阻塞式的通信:
阻塞式具体体现在:
①服务端等待连接请求
②创建一个线程跟客户端进行和通信阻塞等待客户端发送的数据。
③客户端阻塞等待服务器端处理后返回响应,在响应返回前,客户端那边就阻塞等待,什么事都做不了。
缺点:
①每次一个客户端接入,都需要创建一个线程来服务这个客户端。当客户端过多时,就会造成服务端的线程数量可能达到了几千几万,这样就可能会造成服务端过载过高,最后宕机
②阻塞时什么事都做不了,只能阻塞等待,浪费资源。
服务器代码会在A行代码中阻塞,一直等待客户端连接。一旦有客户端连接就能得到一个Socket连接,接着就会继续往下阻塞等待客户端的输入信息(一下代码在正式使用时会增加线程池,当有连接时,从线程池获得一个线程执行操作)
服务端代码样例
public class TCPServer {
public static void main(String[] args) {
ServerSocket serverSocket = null;
Socket sc = null;
try {
serverSocket = new ServerSocket(9999);
} catch (IOException e) {
e.printStackTrace();
}
while (true){
try {
sc = serverSocket.accept(); //A
String ip = sc.getInetAddress().getHostAddress();
System.out.println(ip+"已连接");
InputStream is = sc.getInputStream();//B
byte[] b = new byte[1024];
is.read(b);
System.out.println(ip+":"+new String(b));
OutputStream outputStream = sc.getOutputStream();
outputStream.write("知道了".getBytes());
} catch (IOException e) {
e.printStackTrace();
}finally {
try {
sc.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
客户端代码样例
C行代码会一直阻塞直到服务器端做出响应
public class TCPClient {
public static void main(String[] args) {
while (true){
try {
Socket s = new Socket("127.0.0.1",9999);
OutputStream outputStream = s.getOutputStream();
System.out.println("请输入");
Scanner scanner = new Scanner(System.in);
String msg = scanner.nextLine();
outputStream.write(msg.getBytes());
InputStream inputStream = s.getInputStream();//C
byte[] b = new byte[1024];
inputStream.read(b);
System.out.println("服务器:"+new String(b));
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
NIO
核心组件
- 通道(Channel)
首先说一下Channel,“通道/连接”。Stream是单向的,譬如:InputStream, OutputStream是单向的而Channel是双向的,既可以用来进行读操作,又可以用来进行写操作。网络NIO中的Channel也是双向的,可以从另一端获得数据也可以向另一端写入数据。
1)FileChannel(文件IO)
2)DatagramChannel(UDP)
3)SocketChannel(ClientTCP)
4)ServerSocketChannel(ClientTCP) - 缓冲区(Buffer)
NIO中的关键Buffer实现有
ByteBuffer
CharBuffer
DoubleBuffer
FloatBuffer
IntBuffer
LongBuffer,
ShortBuffer - 选择器(Selectors)
Selectors(选择器)能够检测一到多个NIO通道,并能够知道连接是否做好准备的组件。一个单独的线程就可以管理多个channel,从而管理多个网络连接。要使用Selector, 得向Selector注册Channel。轮询一个或多个NIO Channel的状态是否有事件发生。有相应事件发生则执行相应逻辑。这样既可实现了利用一个线程无管理多数连接。不用再一个客户端一个连接。避免了多个线程的切换与维护。 - selectionKey:代表socket与Select之间发生的事件关系。
OP_ACCEPT:有新的网络连接可以建立(accept),值为16
OP_CONNECT:连接已经建立,值为8
OP_READ/OP_WRITE 读写操作 值为1,4
服务端代码样例
public class ChatServer {
private ServerSocketChannel serverSocketChannel;
private Selector selector;
private static final int PORT = 9999;
public ChatServer() {
try {
//得到ServerSocketChannel
serverSocketChannel = ServerSocketChannel.open();
//得到selector
selector = Selector.open();
//绑定端口号
serverSocketChannel.bind(new InetSocketAddress(PORT));
//设置非阻塞方式
serverSocketChannel.configureBlocking(false);
//设置selector给ServerSocketChannel
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("server reday");
} catch (IOException e) {
e.printStackTrace();
}
}
public void start() throws IOException {
//准备执行任务
while (true) {
//每四秒监控一下客户端,返回值为就绪的channel连接数量
if (selector.select(4000) == 0) {
//非阻塞式,可以执行其他的事务
System.out.println("没有准备就绪的连接,或已注册的连接");
continue;
}
//有连接就绪了,得到selectionKey判断事件
//得到所有的发生事件
Iterator<SelectionKey> selectionKeyIterable = selector.selectedKeys().iterator();
while (selectionKeyIterable.hasNext()) {
SelectionKey selectionKey = selectionKeyIterable.next();
if (selectionKey.isAcceptable()) {
System.out.println("isAcceptable");
SocketChannel socketChannel = null;
//客户端连接事件
socketChannel = serverSocketChannel.accept();
//设置非阻塞方式
socketChannel.configureBlocking(false);
//将该通道注册到监控器中,注重监听读事件
socketChannel.register(selector, SelectionKey.OP_READ);
System.out.println(socketChannel.getRemoteAddress());
} else if (selectionKey.isReadable()) {
readMsg(selectionKey);
}
selectionKeyIterable.remove();
}
}
}
/** * 收到消息,并且广播 * * @param selectionKey */
private void readMsg(SelectionKey selectionKey) throws IOException {
//获得通道
Channel channel = selectionKey.channel();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
int count = ((SocketChannel) channel).read(byteBuffer);
if (count > 0) {
String msg = new String(byteBuffer.array());
System.out.println(msg);
//发送广播
broadCast(msg, channel);
}
}
/** * 给所有的连接发送消息 * * @param msg * @param expect */
private void broadCast(String msg, Channel expect) throws IOException {
for (SelectionKey key : selector.keys()) {
Channel channel = key.channel();
if (channel instanceof SocketChannel && channel != expect) {
ByteBuffer byteBuffer = ByteBuffer.wrap(msg.getBytes());
((SocketChannel) channel).write(byteBuffer);
}
}
}
public static void main(String[] args) {
ChatServer chatServer = new ChatServer();
try {
chatServer.start();
} catch (IOException e) {
e.printStackTrace();
}
}
}
客户端样例代码
public class ChatClient {
private String userName;
private final static String HOST="127.0.0.1";
private int port=9999;
private SocketChannel socketChannel;
public ChatClient() {
try {
//得到一个网络连接
socketChannel = SocketChannel.open();
//设置阻塞方式
socketChannel.configureBlocking(false);
//封装连接地址
InetSocketAddress address = new InetSocketAddress(HOST,port);
//使连接连上客户端
if (!socketChannel.connect(address)){
//连接失败
while (!socketChannel.finishConnect()){
//未连接上,持续连接,在连接过程中非阻塞
System.out.println("正在连接服务器....");
}
}
userName = socketChannel.getLocalAddress().toString();
} catch (IOException e) {
e.printStackTrace();
}
}
public void sendMsg(String msg) throws IOException {
if (msg.equals("bye")){
socketChannel.close();
return;
}
ByteBuffer byteBuffer = ByteBuffer.wrap((userName+":"+msg).getBytes());
socketChannel.write(byteBuffer);
}
public void receiveMsg() throws IOException {
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
int count = socketChannel.read(byteBuffer);
if (count > 0) {
String msg = new String(byteBuffer.array());
System.out.println(msg.trim());
}
}
public static void main(String[] args) {
ChatClient chatClient = new ChatClient();
new Thread(){
@Override
public void run() {
while (true){
try {
chatClient.receiveMsg();
Thread.sleep(2000);
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}.start();
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()){
String msg = scanner.nextLine();
try {
chatClient.sendMsg(msg);
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
还没有评论,来说两句吧...