官网介绍 : [ Netty是一个NIO客户端服务器框架,可以快速轻松地开发网络应用程序,例如协议服务器和客户端。它极大地简化和简化了诸如TCP和UDP套接字服务器之类的网络编程。]
BIO :同步阻塞(传统阻塞型),服务器实现模式为一个连接一个线程,即客户端有连接请求时服务器端就需要启动一个线程进行处理,如果这个连接不做任何事情会造成不必要的线程开销。
NIO :同步非阻塞,服务器实现模式为一个线程处理多个请求,即客户端发送的连接请求都会注册到多路复用器上,多路复用器轮询到连接有I/O请求就进行处理。
AIO : 异步非阻塞,AIO引入异步通道的概念,采用了 Proactor模式,简化了程序编写,有效的请求菜启动线程,它的特点是先由操作系统完成后才通知服务端和程序启动线程去处理,一般适用于连接数多且连接时间较长的应用
I/O使用场景:
案例 :
import java.io.IOException;
import java.io.InputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @author yxl
* @version 1.0
* @date 2021/3/9 20:18
*/
public class BIOServer {
public static void main(String[] args) throws IOException {
//1.创建一个线程池
ExecutorService executorService = Executors.newCachedThreadPool();
ServerSocket serverSocket = new ServerSocket(6666);
System.out.println("服务器启动了");
while(true){
Socket socket = serverSocket.accept();
System.out.println("连接到一个客户端");
//创建线程池与之
executorService.execute(new Runnable() {
@Override
public void run() {
handler(socket);
}
});
}
}
public static void handler(Socket socket){
try {
System.out.println("当前线程ID"+ Thread.currentThread().getId() + "名称" + Thread.currentThread().getName());
byte[] bytes = new byte[1024];
InputStream inputStream = socket.getInputStream();
while (true){
int read = inputStream.read();
if(read != -1){
System.out.println("-");
System.out.println(new String(bytes,0,read));
}else{
break;
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
System.out.println("关闭客户端的连接");
}
}
}
使用 telnet 测试
缺点:
一、Buffer :
├─ByteBuffer
├─IntBuffer
├─LongBuffer
├─ShortBuffer
├─StringCharBuffer
├─DoubleBuffer
├─CharBuffer
└ FloatBuffer
public class BasicBuffer {
public static void main(String[] args) {
//创建一个IntBuffer 大小为5
IntBuffer intBuffer = IntBuffer.allocate(5);
intBuffer.put(1);
intBuffer.put(2);
intBuffer.put(3);
intBuffer.put(4);
intBuffer.put(45);
//读写转换
intBuffer.flip();
while (intBuffer.hasRemaining()){
System.out.println(intBuffer.get());
}
}
}
所有的 Boffer 都继承 并且有几个重要的参数
public abstract class IntBuffer
extends Buffer
implements Comparable<IntBuffer>
public abstract class Buffer {
/**
* The characteristics of Spliterators that traverse and split elements
* maintained in Buffers.
*/
static final int SPLITERATOR_CHARACTERISTICS =
Spliterator.SIZED | Spliterator.SUBSIZED | Spliterator.ORDERED;
// Invariants: mark <= position <= limit <= capacity
private int mark = -1; //
private int position = 0; //当你写数据到Buffer中时,position表示当前的位置 初始值为o
private int limit;
private int capacity;
}
mark: 标记
capacity:容量,既可以容纳的最大数据量,在缓存区创建时被设定并且不能更改。
position:位置,下一个要被读或者写的元素的索引,每次读写缓存区数据时都会改变值,为下次读写做准备。
limit:表示缓存区的当前终点,不能对缓存区超过极限的位置进行读写,且极限时可以修改的。
断点:
二、Channel :
FileChannel :
public class BasicFileChannel {
public static void main(String[] args) throws IOException {
String str = "Hello World";
//创建一个输出流Channel
FileOutputStream fileOutputStream = new FileOutputStream("/Users/yanxiaolong/a.txt");
//拿到FileChannel
FileChannel channel = fileOutputStream.getChannel();
//创建缓存区Buffer
ByteBuffer allocate = ByteBuffer.allocate(1034);
allocate.put(str.getBytes(StandardCharsets.UTF_8));
//对ByteBuffer进行flip
allocate.flip();
//将ByteBuffer写入FileChannel
channel.write(allocate);
//关闭流
fileOutputStream.close();
}
}
public class BasicFileChannel2 {
public static void main(String[] args) throws IOException {
//读取文件创建输入流
File file = new File("/Users/yanxiaolong/a.txt");
FileInputStream fileInputStream = new FileInputStream(file);
FileChannel channel = fileInputStream.getChannel();
//创建缓存区Buffer
ByteBuffer allocate = ByteBuffer.allocate((int)file.length());
channel.read(allocate);
System.out.println(new String(allocate.array()));
fileInputStream.close();
}
}
MappedByteBuffer 文件Copy
public class BasicFileChannel3 {
public static void main(String[] args) throws IOException {
long start = System.currentTimeMillis();
FileChannel inChannel = FileChannel.open(Paths.get("/Users/yanxiaolong/a.txt"), StandardOpenOption.READ);
FileChannel outChannel = FileChannel.open(Paths.get("/Users/yanxiaolong/b.txt"), StandardOpenOption.WRITE,StandardOpenOption.READ,StandardOpenOption.CREATE_NEW);
//内存映射文件
MappedByteBuffer inMappedBuf = inChannel.map(FileChannel.MapMode.READ_ONLY, 0, inChannel.size());
MappedByteBuffer outMappedBuf = outChannel.map(FileChannel.MapMode.READ_WRITE, 0, inChannel.size());
//直接对缓冲区进行数据的读写操作
byte[] dst = new byte[inMappedBuf.limit()];
inMappedBuf.get(dst);
outMappedBuf.put(dst);
inChannel.close();
outChannel.close();
long end = System.currentTimeMillis();
System.out.println("内存映射文件所花时间:"+(end-start));
}
}
三、Selector :
服务端
public class NIOServer {
public static void main(String[] args) throws IOException {
//创建ServerSocketChannel
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
//创建选择器
Selector selector = Selector.open();
serverSocketChannel.bind(new InetSocketAddress(6666));
serverSocketChannel.configureBlocking(false);
serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT);
//循环等待客户端连接
while (true){
if(selector.select(1000) == 0){
System.out.println("服务器等待了一秒 无连接");
continue;
}
//如果大于0就获取相关的selectedKeys连接,已经获取到关注到事件
//selector.selectedKeys(); 返回事件的集合
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()){
//获取下一个元素
SelectionKey next = iterator.next();
//如果是OP_ACCEPT,有新的客户端连接
if(next.isAcceptable()){
//该客户端生成一个SocketChannel
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
System.out.println("客户端连接成功 生成一个 socketChannel" + socketChannel.hashCode());
//注册,关联ByteBuffer
socketChannel.register(selector,SelectionKey.OP_READ, ByteBuffer.allocate(1024));
}
//发送OP_READ
if(next.isReadable()){
SocketChannel socketChannel = (SocketChannel) next.channel();
ByteBuffer byteBuffer = (ByteBuffer) next.attachment();
socketChannel.read(byteBuffer);
System.out.println("from 客户端:" + new String(byteBuffer.array()));
}
iterator.remove();
}
}
}
}
客户端
public class NIOClient {
public static void main(String[] args) throws IOException {
//得到网络通道
SocketChannel socketChannel = SocketChannel.open();
//设置非阻塞
socketChannel.configureBlocking(false);
//socket连接地址
InetSocketAddress inetSocketAddress = new InetSocketAddress("127.0.0.1", 6666);
//连接服务器
if(!socketChannel.connect(inetSocketAddress)){
while(!socketChannel.finishConnect()){
System.out.println("因为连接需要时间,客户端不会阻塞,可以做其他工作");
}
}
//连接成功就发送数据
String str = "hello world";
ByteBuffer byteBuffer = ByteBuffer.wrap(str.getBytes(StandardCharsets.UTF_8));
//发送数据 将Buffer 数据写入Channel
socketChannel.write(byteBuffer);
System.in.read();
}
}
原理:
SelectionKey : 表示SelectableChannel 和 Selector 之间的注册关系,每次向选择器注册通道时就会选择一个事件(选择键)。选择键包含2个表示为整数值的操作集。操作集的每一位都表示该键的通道锁支持的一类可选操作
介绍:
传统I/O :
mmap优化 :
sendFile :
mmap 和 sendFile的区别
案例:
public class NewIoServer {
public static void main(String[] args) throws Exception {
InetSocketAddress address = new InetSocketAddress(7001);
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
ServerSocket serverSocket = serverSocketChannel.socket();
serverSocket.bind(address);
//创建buffer
ByteBuffer byteBuffer = ByteBuffer.allocate(4096);
while (true) {
SocketChannel socketChannel = serverSocketChannel.accept();
int readcount = 0;
while (-1 != readcount) {
try {
readcount = socketChannel.read(byteBuffer);
} catch (Exception ex) {
break;
}
//倒带 position = 0 mark 作废
byteBuffer.rewind();
}
}
}
}
public class NewIoClient {
public static void main(String[] args) throws Exception {
SocketChannel socketChannel = SocketChannel.open();
socketChannel.connect(new InetSocketAddress("localhost", 7001));
String filename = "protoc-3.6.1-win32.zip";
//得到一个文件channel
FileChannel fileChannel = new FileInputStream(filename).getChannel();
//准备发送
long startTime = System.currentTimeMillis();
//在linux下一个transferTo 方法就可以完成传输
//在windows 下 一次调用 transferTo 只能发送8m , 就需要分段传输文件, 而且要主要
//传输时的位置 =》 课后思考...
//transferTo 底层使用到零拷贝
long transferCount = fileChannel.transferTo(0, fileChannel.size(), socketChannel);
System.out.println("发送的总的字节数 =" + transferCount + " 耗时:" + (System.currentTimeMillis() - startTime));
//关闭
fileChannel.close();
}
}
线程模型基本介绍:
单线程模型就是只指定一个线程执行客户端连接和读写操作,也就是在一个Reactor中完成,对应在Netty中的实现就是将NioEventLoopGroup线程数设置为1,核心代码是:
NioEventLoopGroup group = new NioEventLoopGroup(1);
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(group)
.channel(NioServerSocketChannel.class)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ServerHandlerInitializer());
多线程模型就是在一个单Reactor中进行客户端连接处理,然后业务处理交给线程池,核心代码如下:
NioEventLoopGroup eventGroup = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(eventGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ServerHandlerInitializer());
主从多线程模型是有多个Reactor,也就是存在多个selector,所以我们定义一个bossGroup和一个workGroup,核心代码如下:
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup,workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ServerHandlerInitializer());
因篇幅问题不能全部显示,请点此查看更多更全内容