Java-NIO
2.1 传统的BIO编程
网络编程的基本模型是Client/Server模型,也就是两个进程之间进行相互通信,其中服务端提供位置信息(绑定IP地址和监听端口),客户端通过连接操作向服务器监听的地址发起连接请求,通过三次握手建立连接,如果连接成功建立,双方就可以通过网络套接字(socket). 在基于传统同步阻塞模型开发中,ServerSocket负责绑定IP地址,启动监听端口,Socket负责发起连接操作,连接成功后,双方通过输入和输出流进行同步阻塞式通信。
2.1.1 BIO通信模型图
典型的一请求一应答通信模型
采用BIO通信模型的服务端,通常由一个独立的Acceptor线程负责监听客户端的连接,它接收到客户端的连接请求之后为每个客户端创建一个新的线程进行链路处理,处理完成之后,通过输出流返回应答给客户端,线程销毁。

缺点:
缺乏弹性伸缩能力,当客户端并发访问量增加后,服务端的线程个数和客户端并发访问数呈 1:1的正比关系,由于线程是java虚拟机非常宝贵的系统资源,当线程数膨胀后,系统的性能将急剧下降,随着并发访问量的继续增大,系统会发生线程堆栈溢出、创建新的线程失败等问题,并最终导致进程怠机或者僵死,不能对外提供服务。
2.1.2 同步阻塞式I/O代码示例
2.1.2.1 Server 服务端
public static void main(String[] args) {
int port = 8889;
Socket accept = null;
try {
ServerSocket serverSocket = new ServerSocket(port);
while(true){
accept = serverSocket.accept();
new Thread(new ServerSokcetHandler(accept)).start();
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}finally {
if (accept!=null) {
try {
accept.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
public class ServerSokcetHandler implements Runnable{
private Socket socket;
public ServerSokcetHandler(Socket socket) {
// TODO Auto-generated constructor stub
this.socket = socket;
}
@Override
public void run() {
// TODO Auto-generated method stub
try {
OutputStream outputStream = socket.getOutputStream();
InputStream inputStream = socket.getInputStream();
BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream, "UTF-8"));
PrintWriter writer = new PrintWriter(outputStream);
long time = System.currentTimeMillis();
String message = "";
while((message = reader.readLine())!=null){
System.out.println("#####"+message);
if("time".equals(message)){
writer.write("now is : "+new Date(time));
writer.flush();
System.out.println("时间是:"+new Date(time));
}
writer.println("已经收到信息");
writer.flush();
}
reader.close();
writer.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}2.1.2.2 Client 客户端
public static void main(String[] args) {
int port = 8889;
String host= "127.0.0.1";
Socket socket = null;
try {
socket = new Socket(host,port);
InputStream inputStream = socket.getInputStream();
OutputStream outputStream = socket.getOutputStream();
PrintWriter writer = new PrintWriter(outputStream,true);
Scanner scanner = new Scanner(System.in);
String next = scanner.next();
writer.println(next);
BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream, "UTF-8"));
String message=reader.readLine();
System.out.println("收到消息"+message);
reader.close();
outputStream.close();
} catch (UnknownHostException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}finally {
if(socket!=null){
try {
socket.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}2.1.3 同步阻塞式I/O整理
BIO的主要问题:
在于每当有一个新的客户端请求接入时,服务端必须创建一个新的线程处理新的接入客户端链路,一个线程只能处理一个客户端连接,在高性能服务器应用领域,往往需要面向成千上万个客户端的并发连接,这种模型显然无法满足高性能、高并发接入场景。
2.2 伪异步I/O编程。
为了改进一线程一连接模型,后来演进出了一个通过线程池或者消息队列实现1个或者多个线程处理N个客户端的模型,由于它的底层通信机制依然是沿用同步阻塞I/O,所以被称为“伪异步”。
为了解决同步阻塞I/O面临的一个链路需要一个线程处理的问题,后来有人对它的线程 模型进行了优化,后端通过一个线程池来处理多个客户端的请求接入,形成客户端个数M:线程池最大线程数N的比例关系,其中M可以远远大于N,通过线程池可以灵活的调配线程资源,设置线程的最大值,防止由于海量并发接入导致线程耗尽。
2.2.1 伪异步I/O模型图
当有新的客户端接入的时候,将客户端的socket封装成一个Task(改任务实现java.lang.RUnnnable接口)投递到后端的线程池中进行处理,JDK的线程池维护一个消息队列和N个活跃线程对消息队列中的任务进行处理,由于线程池可以设置消息队列的大小和最大线程数,因此,它的资源占用是可控的,无论多少个客户端并发访问,都不会导致资源的耗尽好怠机。

2.2.2 伪异步I/O
2.2.2.1 线程类
public class TimeServerHandlerExecutePool {
private ExecutorService execute;
public TimeServerHandlerExecutePool(int maxPoolSize,int queueSize){
execute = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), maxPoolSize, 120L, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(queueSize));
}
public void execute(Runnable task){
execute.execute(task);
}
}2.2.2.2服务启动类
public class ThreadServerDemo {
public static void main(String[] args) {
ServerSocket serverSocket = null;
try {
serverSocket = new ServerSocket(8889);
TimeServerHandlerExecutePool pool = new TimeServerHandlerExecutePool(50,1000);//创建I/O任务线程池
while(true){
Socket accept = serverSocket.accept();
pool.execute(new ThreadServerSokcetHandler(accept));
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}finally {
if(serverSocket!=null){
try {
serverSocket.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
}2.2.2.3 服务业务处理类
public class ThreadServerSokcetHandler implements Runnable{
private Socket socket;
public ThreadServerSokcetHandler(Socket socket) {
// TODO Auto-generated constructor stub
this.socket = socket;
}
@Override
public void run() {
// TODO Auto-generated method stub
try {
OutputStream outputStream = socket.getOutputStream();
InputStream inputStream = socket.getInputStream();
BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream, "UTF-8"));
PrintWriter writer = new PrintWriter(outputStream);
long time = System.currentTimeMillis();
String message = "";
while((message = reader.readLine())!=null){
System.out.println("#####"+message);
if("time".equals(message)){
writer.write("now is : "+new Date(time));
writer.flush();
System.out.println("时间是:"+new Date(time));
}
writer.println("已经收到信息");
writer.flush();
}
reader.close();
writer.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}2.2.3 伪异步I/O说明
伪异步I/O的主函数代码发生了变化,我们首先创建一个时间服务器处理类的线程池,当接收到新的客户端连接的时候,将请求Socket封装成一个Task,然后调用线程池的execute方法执行,从而避免了每个请求接入都创建一个新的线程。
由于线程池和消息队列都是有界的,因此,无论客户端并发连接数多大,它都不会导致线程个数过于膨胀或者内存溢出,相比于传统的一连接一线程模型,是一种改良。
伪异步I/O通信框架采用了线程池实现,因此避免了为每个请求都创建一个独立线程造成的线程资源耗尽问题。但是由于它底层的通信依然采用同步阻塞模型,因此无法从根本上解决问题。
原因:
(1).服务端处理缓慢,返回应答消息耗费60s,平时只需要10ms
(2)采用伪异步I/O的线程正在读取故障服务节点的响应,由于读取输入流是阻塞的,因此,它将会被同步阻塞60s.
(3)假如所有的可用线程都被故障服务器阻塞,那后续所有的I/O消息都将在队列中排队。
(4)由于线程池采用阻塞队列实现,当队列积满之后,后续入队列的操作将被阻塞。
(5)由于前端只有一个accptor线程接收客户端接入,它被阻塞在线程池的同步阻塞队列后,新的客户端请求消息将被拒绝,客户端会发生大量的连接超时。
(6)由于几乎所有的连接都超时,调用这会认为系统已经崩溃,无法接收新的请求消息。
2.3 NIO编程(Non-block I/O)
与socket类和ServerSocket类相对应,NIO也提供了SocketChannel 和ServerSocketChannel两种不同的套接字通道实现。这两种新增的通道都支持阻塞和非阻塞两种模式。阻塞模式使用非常简单,但是性能和可靠性都不好,非阻塞模式则正好相反。
同步阻塞I/O: 低负载、低并发的应用程序。(可降低编程复杂度)
NIO非阻塞: 高负载、高并发的网络应用。2.3.1 NIO类库简介
新的输入/输出 (NIO) 库是在 JDK 1.4 中引入的。NIO 弥补了原来同步阻塞I/O 的不足,它在标准 Java 代码中提供了高速的、面向块的 I/O。通过定义包含数据的类,以及通过以块的形式处理这些数据,NIO 不用使用本机代码就可以利用底层优化,这是原来的 I/O 包所无法做到的
2.3.1.1 缓冲区buffer
Buffer 是一个对象, 它包含一些要写入或者要读出的数据。 在 NIO类库 中加入 Buffer 对象,体现了新库与原 I/O 的一个重要区别。在面向流的 I/O 中,我们将数据直接写入或者将数据直接读到 Stream 对象中。
在 NIO 库中,所有数据都是用缓冲区进行处理的。在读取数据时,它是直接读到缓冲区中;在写入数据时,它也是写入到缓冲区中。任何时候访问 NIO 中的数据,我们都是通过缓冲区进行读写操作。
缓冲区实质上是一个数组。通常它是一个字节数组(ByteBuffer),也可以使用其它种类的数组。但是一个缓冲区不仅仅是一个数组,缓冲区提供了对数据的结构化访问,及维护读写位置(limit)等信息。
最常用的缓冲区是ByteBuffer,一个ByteBuffer提供了一组功能用于操作byte数组。除了ByteBuffer,还有其它的一些缓冲区,事实上,每一种Java基本类型(除了Boolean类型)都对应有一种缓冲区,如下所示:
ByteBuffer:字节缓冲区
CharBuffer:字符缓冲区
ShortBuffer:短整型缓冲区
IntBuffer:整形缓冲区
LongBuffer:长整形缓冲区
FloatBuffer:浮点型缓冲区
DoubleBuffer:双精度浮点型缓冲区每一个Buffer类都是Buffer接口的一个子实例。除了 ByteBuffer,每一个 Buffer 类都有完全一样的操作,只是它们所处理的数据类型不一样。因为大多数标准I/O操作都使用ByteBuffer,所以它除了具有一般缓冲区的操作之外还提供一些特有的操作,方便网络读写。
2.3.1.2 通道channel
Channel是一个通道,可以通过它读取和写入数据,它就像自来水管一样,网络数据通过Channel读取和写入。通道与流的不同之处在于通道是双向的。而流只是在一个方向上移动(一个流必须是 InputStream 或者 OutputStream 的子类),而通道可以用于读、写或者同时用于读写。
因为Channel是全双工的,所以它可以比流更好地映射底层操作系统的API。特别是在UNIX网络编程模型中,底层操作系统的通道都是全双工的,同时支持读写操作。
自顶向下,前三层主要是Channel接口,用于定义它的功能,后面是一些具体的功能类(抽象类),从类图可以看出,实际上Channel可以分为两大类,分别是用于网络读写的
SelectableChannel和用于文件操作的FileChannel。
ServerSocketChannel和SocketChannel都是SelectableChannel的子类
2.3.1.3 多路复用器Selector
它是JAVA NIO编程的基础,熟练的掌握Selector对于掌握NIO编程至关重要。多路复用器提供选择已经就绪的任务的能力。简单来讲,Selector会不断的轮询注册在其上的Channel,如果某个Channel上面有新的TCP连接接入、读和写事件,这个Channel就处于就绪状态,会被Selector轮询出来,然后通过SelectionKey可以获取就绪Channel的集合进行后续的IO操作。
一个多路复用器Selector可以同时轮询多个Channel,由于JDK使用了epoll()代替传统的select实现,所以它并没有最大连接句柄1024/2048的限制。这也就意味着只需要一个线程负责Selector的轮询,就可以接入成千上万的客户端,这的确是一个巨大的改进。
2.3.2 NIO服务端序列图

2.3.3 NIO客户端序列图

2.3.4 代码演示
2.3.4.1 server端
public class TimeServer {
public static void main(String[] args) {
MultiTimeServerHandler handler = new MultiTimeServerHandler(8800);
new Thread(handler,"nio-100").start();
}
}
public class MultiTimeServerHandler implements Runnable{
private Selector selector;//多路复用器
private ServerSocketChannel serverChannel;//客户端连接父管道
private volatile boolean stop;
public MultiTimeServerHandler(int port) {
// TODO Auto-generated constructor stub
try {
selector = Selector.open();
serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
serverChannel.socket().bind(new InetSocketAddress(port), 1024);
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("服务器启动监听端口:"+port);
} catch (Exception e) {
// TODO: handle exception
e.printStackTrace();
System.exit(1);
}
}
public void stop(){
this.stop = true;
}
@Override
public void run() {
// TODO Auto-generated method stub
while (!stop) {
try {
selector.select(1000);//1秒选择一次
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectedKeys.iterator();
SelectionKey key = null;
while(iterator.hasNext()){
key = iterator.next();
iterator.remove();
try {
handlerInput(key);
} catch (Exception e) {
// TODO: handle exception
key.cancel();
if(key.channel()!=null){
key.channel().close();
}
}
}
} catch (Throwable e) {
// TODO: handle exception
e.printStackTrace();
}
}
//多路复用器关闭后,所有注册在上面的Channel和pipe等资源都会被自动去注册并关闭,所以不需要重复释放资源
if(selector!=null){
try {
selector.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
private void handlerInput(SelectionKey key) throws IOException{
//处理新接入的请求信息
if(key.isValid()){
SelectableChannel channel = key.channel();
ServerSocketChannel ssc = (ServerSocketChannel) channel;
SocketChannel accept = ssc.accept();
ssc.configureBlocking(false);
ssc.register(selector, SelectionKey.OP_READ);
}
if(key.isReadable()){
SocketChannel sc = (SocketChannel)key.channel();
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
int readBytes = sc.read(readBuffer);
if(readBytes>0){
readBuffer.flip();
byte[] bytes = new byte[readBuffer.remaining()];
readBuffer.get(bytes);
String body = new String(bytes,"UTF-8");
System.out.println("the server receive order: "+body);
doWrite(sc,"cccccc");
}else if(readBytes<0){
//对链路关闭
key.cancel();
sc.close();
}else{
;
}
}
}
private void doWrite(SocketChannel sc,String response) throws IOException{
if(response!=null && response.trim().length()>0){
byte[] bytes = response.getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
writeBuffer.put(bytes);
writeBuffer.flip();
sc.write(writeBuffer);
}
}
}2.3.4.2 client客户端
public class TimeClient {
public static void main(String[] args) {
try {
new Thread(new MutilTimeClientHandler("127.0.0.1", 8800),"client-001").start();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
public class MutilTimeClientHandler implements Runnable {
private String host;
private int port;
private Selector selector;
private SocketChannel socketChannel;
private volatile boolean stop;
public MutilTimeClientHandler(String host,int port) throws IOException{
this.host = host == null ?"127.0.0.1":host;
this.port = port;
try {
selector = Selector.open();
socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
} catch (Exception e) {
// TODO: handle exception
e.printStackTrace();
System.exit(1);
}
}
@Override
public void run() {
// TODO Auto-generated method stub
try {
doConnect();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
System.exit(1);
}
while(!stop){
try {
selector.select(1000);//1秒唤醒
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectedKeys.iterator();
SelectionKey key = null;
while(iterator.hasNext()){
key = iterator.next();
iterator.remove();
try {
handlerInput(key);
} catch (Exception e) {
// TODO: handle exception
if(key!=null){
key.cancel();
if(key.channel()!=null){
key.channel().close();
}
}
}
}
} catch (Exception e) {
// TODO: handle exception
e.printStackTrace();
System.exit(1);
}
}
if (selector!=null) {
try {
selector.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
private void handlerInput(SelectionKey selectionKey) throws IOException{
if(selectionKey.isValid()){
SocketChannel sc =(SocketChannel)selectionKey.channel();
if(selectionKey.isConnectable()){
System.out.println("finshed: "+ sc.finishConnect());
if(sc.finishConnect()){
sc.register(selector, SelectionKey.OP_READ);
doWrite(sc);
}else{
System.exit(1);
}
}
if(selectionKey.isReadable()){
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
int readBytes = sc.read(readBuffer);
if(readBytes>0){
readBuffer.flip();
byte[] bytes = new byte[readBuffer.remaining()];
readBuffer.get(bytes);
String data = new String(bytes, "UTF-8");
System.out.println(data);
this.stop = true;
}else if(readBytes<0){
//链路关闭
selectionKey.cancel();
socketChannel.close();
}else{
;
}
}
}
}
private void doConnect() throws IOException{
//直接连接成功,则注册到多路复用器上,发送请求消息,读应答
if(socketChannel.connect(new InetSocketAddress(host, port))){
socketChannel.register(selector, SelectionKey.OP_READ);
doWrite(socketChannel);
}else{
socketChannel.register(selector, SelectionKey.OP_CONNECT);
}
}
private void doWrite(SocketChannel sc) throws IOException{
byte[] bytes = "query time order".getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
writeBuffer.put(bytes);
writeBuffer.flip();
sc.write(writeBuffer);
if(!writeBuffer.hasRemaining()){
System.out.println("send order 2 server Successed");
}
}
}2.3.5 使用NIO编程的优点
1.客户端发起的连接操作是异步的,可以通过多路复用器注册OP_CONNECT等待后续结果,不需要像之前的客户端那样被同步阻塞。
2.SocketChannel的读写操作都是异步的,如果没有可读写的数据它不会同步等待,直接返回,这样I/O通信线程就可以处理其他的链路,不需要同步等待这个链路可用。
3.线程模型的优化:由于JDK的Selector在Linux等主流操作系统上通过epoll实现,它没有连接句柄数的限制(只受限于操作系统的最大句柄数或者对单个进程的句柄限制),这意味着一个Selector线程可以同时处理成千上万个客户端连接,而且性能不会随着客户端的增加而线性下降,因此,它非常适合做高性能、高负载的网络服务器。
2.4 AIO和NIO的区别
NIO是同步的,它需要tomcat去轮询。连接多且短的时候,轮询效率较高,但如果大多数连接耗时都比较长,则会增加轮询开销,空耗很多cpu周期。
AIO是异步的,无需轮询,但需要操作系统参与,因此如果连接多而短,就会在内核和用户态之间频繁切换,性能反而不如轮询。
2.5 选择Netty的理由
开发出高质量的NIO程序并不是一件简单的事情,除去NIO固有的复杂性和BUG不谈,作为一个NIO服务端需要能够处理网络的闪断、客户端的重复接入、客户端的安全认证、消息的编解码、半包读写等等,如果你没有足够的NIO编程经验积累,一个NIO框架的稳定往往需要半年甚至更长的时间。更为糟糕的是一旦在生产环境中发生问题,往往会导致跨节点的服务调用中断,严重的可能会导致整个集群环境都不可用,需要重启服务器,这种非正常停机会带来巨大的损失。
从可维护性角度看,由于NIO采用了异步非阻塞编程模型,而且是一个IO线程处理多条链路,它的调试和跟踪非常麻烦,特别是生产环境中的问题,我们无法有效调试和跟踪,往往只能靠一些日志来辅助分析,定位难度很大。
2.5.1 不选择java原生的NIO编程原因
(1)NIO的类库和API繁杂,使用麻烦,你需要熟练掌握Selector、ServerSocketChannel、SocketChannel、ByteBuffer等;
(2)需要具备其它的额外技能做铺垫,例如熟悉Java多线程编程,因为NIO编程涉及到Reactor模式,你必须对多线程和网路编程非常熟悉,才能编写出高质量的NIO程序;
(3)可靠性能力补齐,工作量和难度都非常大。例如客户端面临断连重连、网络闪断、半包读写、失败缓存、网络拥塞和异常码流的处理等等,NIO编程的特点是功能开发相对容易,但是可靠性能力补齐工作量和难度都非常大;
(4)JDK NIO的BUG,例如臭名昭著的epoll bug,它会导致Selector空轮询,最终导致CPU 100%。官方声称在JDK1.6版本的update18修复了该问题,但是直到JDK1.7版本该问题仍旧存在,只不过该bug发生概率降低了一些而已,它并没有被根本解决。
Epoll: 在Linux平台上一种高效的I/O方法—epoll,针对网络游戏中大量并发客户请求问题,提出采用epoll机制建立高效网络游戏服务器思想,较好地解决了网络游戏服务器中的大量用户并发接入问题。
2.5.2 使用netty的原因
Netty是业界最流行的NIO框架之一,它的健壮性、功能、性能、可定制性和可扩展性在同类框架中都是首屈一指的,它已经得到成百上千的商用项目验证,例如Hadoop的RPC框架avro使用Netty作为底层通信框架。很多其它业界主流的RPC框架,也使用Netty来构建高性能的异步通信能力。
通过对Netty的分析,我们将它的优点总结如下:
(1).API使用简单,开发门槛低;
(2).功能强大,预置了多种编解码功能,支持多种主流协议;
(3).定制能力强,可以通过ChannelHandler对通信框架进行灵活的扩展;
(4).性能高,通过与其它业界主流的NIO框架对比,Netty的综合性能最优;
(5).成熟、稳定,Netty修复了已经发现的所有JDK NIO BUG,业务开发人员不需要再为NIO的BUG而烦恼;
(6).社区活跃,版本迭代周期短,发现的BUG可以被及时修复,同时,更多的新功能会被加入;
(7).经历了大规模的商业应用考验,质量已经得到验证。在互联网、大数据、网络游戏、企业应用、电信软件等众多行业得到成功商用,证明了它可以完全满足不同行业的商业应用。
文章标题:Java-NIO
发布时间:2019-11-15, 15:43:16
最后更新:2019-11-15, 15:43:16