现《第9章 用Netty实现物联网》节略部分
Java NIO
说了这么多理论,但最终还是只有通过代码才能更好地体现NIO技术的优势,下面来看看NIO
的Channel
和Buffer
是不是会比BIO
更有效率。
package cn.javabook.chapter11.nio;
import java.io.*;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
/**
* NIO中的Channel和Buffer
*
*/
public class TestNIO {
public static void main(String[] args) throws IOException {
// 传统I/O
long start = System.currentTimeMillis();
FileInputStream fis1 = new FileInputStream("/testfile1");
FileOutputStream fos1 = new FileOutputStream("/testfile2");
try {
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(fis1), 1024);
String line = null;
while ((line = bufferedReader.readLine()) != null) {
fos1.write(line.getBytes());
}
} catch (IOException e) {
e.printStackTrace();
} finally {
fis1.close();
fos1.close();
}
long end = System.currentTimeMillis();
System.out.println("传统IO耗时:" + (end - start) + " 毫秒");
// 改进的I/O
start = System.currentTimeMillis();
FileInputStream fis2 = new FileInputStream("/testfile1");
FileOutputStream fos2 = new FileOutputStream("/testfile3");
try {
byte[] b = new byte[1024];
int len = 0;
while ((len = fis2.read(b)) != -1) {
fos2.write(b, 0, len);
}
} catch (IOException e) {
e.printStackTrace();
} finally {
fis2.close();
fos2.close();
}
end = System.currentTimeMillis();
System.out.println("改进的IO耗时:" + (end - start) + " 毫秒");
// NIO
start = System.currentTimeMillis();
FileChannel fis3 = new FileInputStream("/testfile1").getChannel();
FileChannel fos3 = new FileOutputStream("/testfile4").getChannel();
try {
ByteBuffer bytedata = ByteBuffer.allocate(1024);
while (fis3.read(bytedata) != -1) {
// 读写交叉进行
bytedata.flip();
fos3.write(bytedata);
bytedata.clear();
}
} catch (IOException e) {
e.printStackTrace();
} finally {
fis3.close();
fos3.close();
}
end = System.currentTimeMillis();
System.out.println("NIO耗时:" + (end - start) + " 毫秒");
// // Scattering reads
// ByteBuffer buffer1 = ByteBuffer.allocate(1024);
// ByteBuffer buffer2 = ByteBuffer.allocate(1024);
// ByteBuffer[] bufferArray1 = { buffer1, buffer2 };
// FileChannel channel1 = new FileInputStream("/Users/bear/home/work/testfile1").getChannel();
// channel1.read(bufferArray1);
//
// // Gathering writes
// ByteBuffer buffer3 = ByteBuffer.allocate(1024);
// ByteBuffer buffer4 = ByteBuffer.allocate(1024);
// ByteBuffer[] bufferArray2 = { buffer1, buffer2 };
// FileChannel channel2 = new FileInputStream("/Users/bear/home/work/testfile1").getChannel();
// channel2.write(bufferArray2);
}
}
上面通过三种不同的I/O
复制一个事先准备好的大文件,如果文件太小会看不到效果。
传统的IO
、改进的IO
和NIO
的不同在于,一个是BufferedReader
,而一个是byte[]
,NIO
则使用了Channel
和Buffer
。
执行代码后,最终输出如下结果。
传统IO耗时:14141 毫秒
改进的IO耗时:1208 毫秒
NIO耗时:1268 毫秒
从结果能明显看出,NIO
相比于传统IO
的技术优势。而改进的I/O
使用了byte[]数组
,这也等同于变相使用缓冲了,因此效果和NIO
相差不大。
在《Java编程思想(第四版)》
和《On Java 8》
(实际上是《Java编程思想(第五版)》
)中,都提到过新I/O
使用NIO
的方式重写了老的I/O,而使它获得了NIO
的种种优点。所以即使不显式地使用NIO
,也能带来性能和速度的提升。

至于Selector
的轮询,用代码演示也比较容易。
package cn.javabook.chapter11.nio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.Channel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.util.Set;
/**
* NIO中的Selector
*
*/
public class TestSelector {
public static void main(String[] args) throws IOException {
// 创建ServerSocketChannel
ServerSocketChannel channel1 = ServerSocketChannel.open();
channel1.socket().bind(new InetSocketAddress("127.0.0.1", 9527));
channel1.configureBlocking(false);
ServerSocketChannel channel2 = ServerSocketChannel.open();
channel2.socket().bind(new InetSocketAddress("127.0.0.1", 9528));
channel2.configureBlocking(false);
// 创建一个Selector对象
Selector selector = Selector.open();
// 按照字面意思理解,应该是这样的:selector.register(channel, event);
// 但其实是这样的:channel.register(selector, SelectionKey.OP_READ);
// 四种监听事件:
// OP_CONNECT(连接就绪)
// OP_ACCEPT(接收就绪)
// OP_READ(读就绪)
// OP_WRITE(写就绪)
// 注册Channel到Selector,事件一旦被触发,监听随之结束
SelectionKey key1 = channel1.register(selector, SelectionKey.OP_ACCEPT);
SelectionKey key2 = channel2.register(selector, SelectionKey.OP_ACCEPT);
// 模版代码:在编写程序时,大多数时间都是在模板代码中添加相应的业务代码
while(true) {
int readyNum = selector.select();
if (readyNum == 0) {
continue;
}
Set<SelectionKey> selectedKeys = selector.selectedKeys();
// 轮询
for (SelectionKey key : selectedKeys) {
Channel channel = key.channel();
if (key.isConnectable()) {
if (channel == channel1) {
System.out.println("channel1连接就绪");
}
if (channel == channel2) {
System.out.println("channel2连接就绪");
}
} else if (key.isAcceptable()) {
if (channel == channel1) {
System.out.println("channel1接收就绪");
}
if (channel == channel2) {
System.out.println("channel2接收就绪");
}
}
// 触发后删除,这里不删
// it.remove();
}
}
}
}
为了测试效果,需要借助于一个额外的辅助工具SocketTest.jar
。

首先运行
TestSelector
的main()
方法,启动Selector
服务。然后双击
SocketTest.jar
,在其中输入代码指定的地址127.0.0.1
和端口9527
。再单击
SocketTest
应用程序中的Connect
按钮,单击后如下图所示。

此时再观察代码输出,会发现如下打印结果。
channel1接收就绪
这时候单击Disconnect
,输入另一个端口号9528
后,再次单击Connect
。

再次观察代码输出。
channel2接收就绪
channel1接收就绪
从结果可以说明,之前虽然连接过channel1
,但却因为主动Disconnect
而断开了,尽管如此,Selector
依然保持了channel1
的活跃状态。
当通过SocketTest
应用程序再次连接到channel2
时,Selector
会一并判断channel1
的状态——这已经很充分地展示了Selector
的轮询机制。
Java AIO
下面是AIO
用法的演示。
客户端相关代码
package cn.javabook.chapter11.aio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.StandardSocketOptions;
import java.nio.channels.AsynchronousSocketChannel;
import java.util.concurrent.TimeUnit;
/**
* AIO客户端
*
*/
public class AioClient {
public void start() throws IOException, InterruptedException {
AsynchronousSocketChannel channel = AsynchronousSocketChannel.open();
if (channel.isOpen()) {
// socket接收缓冲区recbuf大小
channel.setOption(StandardSocketOptions.SO_RCVBUF, 128 * 1024);
// socket发送缓冲区recbuf大小
channel.setOption(StandardSocketOptions.SO_SNDBUF, 128 * 1024);
// 保持长连接状态
channel.setOption(StandardSocketOptions.SO_KEEPALIVE, true);
// 连接到服务端
channel.connect(new InetSocketAddress(8080), null, new AioClientHandler(channel));
// 阻塞主进程
for(;;) {
TimeUnit.MILLISECONDS.sleep(1000);
}
} else {
throw new RuntimeException("Channel not opened!");
}
}
public static void main(String[] args) throws IOException, InterruptedException {
new AioClient().start();
}
}
package cn.javabook.chapter11.aio;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.util.concurrent.ExecutionException;
/**
* AIO客户端CompletionHandler
*
*/
public class AioClientHandler implements CompletionHandler<Void, AioClient> {
private final AsynchronousSocketChannel channel;
private final CharsetDecoder decoder = Charset.defaultCharset().newDecoder();
private final BufferedReader input = new BufferedReader(new InputStreamReader(System.in));
public AioClientHandler(AsynchronousSocketChannel channel) {
this.channel = channel;
}
@Override
public void failed(Throwable exc, AioClient attachment) {
throw new RuntimeException("channel not opened!");
}
@Override
public void completed(Void result, AioClient attachment) {
System.out.println("send message to server: ");
try {
// 将输入内容写到buffer
String line = input.readLine();
channel.write(ByteBuffer.wrap(line.getBytes()));
// 在操作系统中的Java本地方法native已经把数据写到了buffer中
// 这里只需要一个缓冲区能接收就行了
ByteBuffer buffer = ByteBuffer.allocate(1024);
while (channel.read(buffer).get() != -1) {
buffer.flip();
System.out.println("from server: " + decoder.decode(buffer).toString());
if (buffer.hasRemaining()) {
buffer.compact();
} else {
buffer.clear();
}
// 将输入内容写到buffer
line = input.readLine();
channel.write(ByteBuffer.wrap(line.getBytes()));
}
} catch (IOException | InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
}
服务端相关代码
package cn.javabook.chapter11.aio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.StandardSocketOptions;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.util.concurrent.TimeUnit;
/**
* AIO服务端
*
*/
public class AioServer {
public void start() throws InterruptedException, IOException {
AsynchronousServerSocketChannel channel = AsynchronousServerSocketChannel.open();
if (channel.isOpen()) {
// socket接受缓冲区recbuf大小
channel.setOption(StandardSocketOptions.SO_RCVBUF, 4 * 1024);
// 端口重用,防止进程意外终止,未释放端口,重启时失败
// 因为直接杀进程,没有显式关闭套接字来释放端口,会等待一段时间后才可以重新use这个关口
// 解决办法就是用SO_REUSEADDR
channel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
channel.bind(new InetSocketAddress(8080));
} else {
throw new RuntimeException("channel not opened!");
}
// 处理client连接
channel.accept(null, new AioServerHandler(channel));
System.out.println("server started");
// 阻塞主进程
for(;;) {
TimeUnit.SECONDS.sleep(1);
}
}
public static void main(String[] args) throws IOException, InterruptedException {
AioServer server = new AioServer();
server.start();
}
}
package cn.javabook.chapter11.aio;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.util.concurrent.ExecutionException;
/**
* AIO服务端CompletionHandler
*
*/
public class AioServerHandler implements CompletionHandler<AsynchronousSocketChannel, Void> {
private final AsynchronousServerSocketChannel serverChannel;
private final CharsetDecoder decoder = Charset.defaultCharset().newDecoder();
private final BufferedReader input = new BufferedReader(new InputStreamReader(System.in));
public AioServerHandler(AsynchronousServerSocketChannel serverChannel) {
this.serverChannel = serverChannel;
}
@Override
public void failed(Throwable exc, Void attachment) {
// 处理下一次的client连接
serverChannel.accept(null, this);
}
@Override
public void completed(AsynchronousSocketChannel result, Void attachment) {
// 处理下一次的client连接,类似链式调用
serverChannel.accept(null, this);
try {
// 将输入内容写到buffer
String line = input.readLine();
result.write(ByteBuffer.wrap(line.getBytes()));
// 在操作系统中的Java本地方法native已经把数据写到了buffer中
// 这里只需要一个缓冲区能接收就行了
ByteBuffer buffer = ByteBuffer.allocate(1024);
while (result.read(buffer).get() != -1) {
buffer.flip();
System.out.println("from client: " + decoder.decode(buffer).toString());
if (buffer.hasRemaining()) {
buffer.compact();
} else {
buffer.clear();
}
// 将输入内容写到buffer
line = input.readLine();
result.write(ByteBuffer.wrap(line.getBytes()));
}
} catch (InterruptedException | ExecutionException | IOException e) {
e.printStackTrace();
}
}
}
感谢支持
更多内容,请移步《超级个体》。