听说你不知道Java并发编程中三种I/O模型?看这一篇就够了
I/O模型基本说明
I/O模型:就是用什么样的通信模式和架构进行数据的传输和接收,很大程度上决定了程序间通信性能。Java中网络编程的I/O模型有:BIO、NIO、AIO,我们可以根据本身业务场景和性能需求来选择不同的I/O模型
BIO
传统的同步阻塞模型,一个客户端连接对应一个单独的线程来处理,如果这个连接不做任何事会造成不必要的线程开销。可以通过线程池机制进行改善
在处理多个客户端请求时,可能会因为线程数量的限制而影响性能,适用于连接数较少的场景,这种模型在并发度低的业务场景中表现良好,例如公司内部的管理系统,请求数量有限
简单示例
网络编程的基本模型是Client/Server模型,即进程间相互通信,其中服务端提供IP地址和端口供客户端连接,双方通过输入和输出流进行通信。示例:实现BIO模式下客户端与服务端通信
public class Server {
public static void main(String[] args) {
// 定义 ServerSocket 对象并进行端口注册
try(ServerSocket ss = new ServerSocket(8080)) {
while (true) {
// 监听客户端 Socket 连接请求
Socket socket = ss.accept();
new Thread(new ServerThread(socket)).start();
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public static class ServerThread implements Runnable {
private Socket socket;
public ServerThread(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
// 获取输入流,接收客户端发送的消息
try(BufferedReader br = new BufferedReader(new InputStreamReader(socket.getInputStream()))) {
String msg;
// 读取流内数据
while (Objects.nonNull(msg = br.readLine())) {
System.out.println(Thread.currentThread().getName() + "服务端接收到消息:" + msg);
}
} catch (Exception e) {
try {
this.socket.close();
} catch (IOException ex) {
throw new RuntimeException(ex);
}
throw new RuntimeException(e);
}
}
}
}
public class Client {
public static void main(String[] args) {
try {
// 创建 Socket 并与服务端建立连接
Socket socket = new Socket("localhost", 8080);
// 从 Socket 对象中获取字符输出流对象
PrintStream ps = new PrintStream(socket.getOutputStream(), true);
Scanner scanner = new Scanner(System.in);
while (true) {
System.out.print("请输入内容:");
String context = scanner.nextLine();
if (Objects.equals("q", context)){
socket.shutdownOutput();
break;
} else {
ps.println(context);
}
}
ps.close();
socket.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
小结:
- 每接收到一个连接都会创建一个线程,线程的竞争、上下文切换影响性能
- 每个线程都会占用栈空间和CPU资源
- 如果客户端没有发送消息,服务端将一直进入阻塞状态占用资源,造成资源浪费
- 当一端出现异常或断开时,另一端同样会抛出异常信息
- 客户端并发增加时,服务端需开启对等的线程资源,访问量过大可能会发生线程栈溢出,最终导致系统死机而无法提供服务
改进
我们可以采用线程池和任务队控制服务端线程数量,从而避免因连接数过多导致服务端线程激增的情况。线程池的使用请参照上一篇文章如何正确使用线程池
编写线程池处理类
public class ServerThreadPoolHandler {
private ExecutorService executorService;
public ServerThreadPoolHandler(int corePoolSize, int maxPoolSize, int workQueueSize) {
ThreadFactory factory = r -> new Thread(r,"bio-server-thread-" + System.currentTimeMillis());
this.executorService = new ThreadPoolExecutor(corePoolSize, maxPoolSize, 10, TimeUnit.MINUTES, new ArrayBlockingQueue<>(workQueueSize), factory);
}
public void execute(Runnable runnable) {
this.executorService.execute(runnable);
}
}
改造server类main方法
public static void main(String[] args) {
// 定义 ServerSocket 对象并进行端口注册
try(ServerSocket ss = new ServerSocket(8080)) {
ServerThreadPoolHandler handler = new ServerThreadPoolHandler(2, 8, 100);
while (true) {
// 监听客户端 Socket 连接请求
Socket socket = ss.accept();
ServerThread thread = new ServerThread(socket);
handler.execute(thread);
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
使用线程池改进虽然避免了为每个请求创建一个独立的线程,但由于底层依然是同步阻塞模型,如果单个消息处理慢或服务器线程池中全部线程都被阻塞,那么后续客户端消息都将在队列中排队,新的请求被拒绝,客户端会发生大量超时情况,因此无法从根本上解决问题。
NIO
Java1.4引入的同步非阻塞IO模型,通过使用Selector机制实现多路复用,支持面向缓冲区、基于通道的IO操作,允许单个线程处理多个连接请求,多路复用器轮询处理当前通道内可处理的数据,若有就进行处理,没有则去处理其它事,提高了系统的并发性能和吞吐量。这种模型适用于高并发、低延迟的场景,例如聊天服务器,弹幕服务器
BIO基于流阻塞式处理数据,而NIO基于通道和缓冲区非阻塞式处理数据,数据从通道内读取到缓冲区中或从缓冲区内写入到通道中,使用Selector监听多个通道事件,因此单个线程就可以监听多个客户端通道,效率比BIO高很多
NIO相关类都被放在java.nio包及其子包下,并且对原java.io包中的很多类进行改写。有三大核心部分:Channel(通道)、Buffer(缓冲区)和Selector(选择器)
Buffer
一个特定基本数据类型的容器,本质是一块可以写入和读取数据的内存。这块内存被包装成NIO Buffer对象,,并提供一组方法,用来方便的访问该块内存,主要用于与BIO通道进行交互。相比较直接对数组操作,Buffer API更加容易操作和管理
Buffer有以下常用子类:ByteBuffer、CharBuffer、ShortBuffer、IntBuffer、LongBuffer、FloatBuffer、DoubleBuffer
重要参数
容量 capacity
:Buffer 内存块可用的最大数据量,通常以字节为单位,创建后不能更改界限 limit
:Buffer 可以读或写的最大数据量。对于写模式,limit 通常和 capacity 相等,表示可以写入的最大数据量;对于读模式,limit 标记了可以安全从 Buffer 中读取的数据量位置 position
:下一次读或写的位置,每次读写数据后,位置都会相应的前或后移动标记 mark
:标记 Buffer 中的一个特定位置,以便后续操作可以使用 reset 快速定位到此位置重置 reset
:与 mark 配合使用快速定位到标记位置
0 <= mark <= position <= limit <= capacity
常见方法
allocate()
:用于创建一个指定容量的 Buffer 实例put()
:将数据写入到 Buffer,通常在写入模式下进行操作flip()
:读取数据前调用,Buffer 从写模式切换到读模式,并准备开始读数据get()
:从 Buffer 中读数据,通常在读模式下操作clean()
:清空 Buffer,重置 position、limit 和 mark 等参数并返回对缓冲区的引用以便再次被写入数据capacity()
: 返回 Buffer 的大小hasRemaining()
:判断 Buffer 中是否还有元素limit()
:返回 Buffer 的界限位置limit(int n)
:设置 Buffer 界限为n,并返回具有新 limit 的 Buffer 对象mark()
:对 Buffer 设置标记,以便稍后可以通过 reset() 方法恢复到这个位置position()
:返回 Buffer 的当前位置position(int n)
:设置 Buffer 的当前位置为n,即下一次读写操作的起始位置。n必须是有效索引值remaining()
:返回 position 和 limit 之间的元素个数reset()
:将位置 position 重置到 mark 所在位置rewind()
:将 position 重置为0,limit 设置为原来 limit 值,取消 mark 标记,可用于重新读取数据compact()
:清除 Buffer 中已经读取的数据,将未读数据移动到 Buffer 开始位置,接着写入位置紧随未读数据之后
直接与间接缓冲区
NIO中ByteBuffer有直接缓冲区和间接内存两种类型,他们内存分配和性能特定有所不同:
直接缓冲区
:直接缓冲区是在Java虚拟机(JVM)之外的物理内存中开辟的一块内存区域,可以通过allocateDirect
创建,使用isDirect()
判断。这意味着当进行I/O操作时,数据可以直接在这个内存区域和操作系统之间传输,避免了在Java堆内存和native内存之间的额外复制过程。这种方式可以极大地提高读写效率,特别是在涉及到文件映射或者网络I/O时。由于直接缓冲区是在JVM外的物理内存中分配的,它们的创建和销毁开销较大,且占用的内存只能通过Java的垃圾回收机制来释放间接缓冲区
:间接缓冲区是在Java虚拟机的堆内存中创建的。相比于直接缓冲区,间接缓冲区的创建和销毁成本较小,易于回收。但是,在进行I/O操作时,数据需要在Java堆内存和操作系统之间进行复制,这可能会降低性能,尤其是在大量数据处理时
总的来说,直接缓冲区提供了更高的I/O效率,适合性能敏感的场景,而间接缓冲区则在内存管理上更为便捷,适用于对性能要求不是特别高的情况。在实际使用时,需要根据具体的应用场景和性能需求来选择合适的缓冲区类型
Channel
表示IO源与目标打开的连接,具有的流数据读写能力,本身并不直接访问数据只与 Buffer 交互,与流不同的是流的读写通常是单向的,通道可以非阻塞和异步地读取和写入通道数据
常见类
FileChannel
:用于读写文件的通道,可对文件进行高效的随机访问SocketChannel
:用于TCP网络通信,提供了客户端和服务期间的数据传输ServerSocketChannel
:用于服务器端,监听新的TCP连接请求DatagramChannel
:用于UDP协议的网络通信MulticastChannel
:用于IP多播通信,允许一个发送者和多个接收者之间数据交互ScatteringByteChannel
:用于散射数据的读写操作GatheringByteChannel
:用于聚集数据的读写操作
Selector
Selector 是 SelectableChannel 对象的多路复用器,可以对多个通道状态进行监控,以判断通道是否准备好对数据的读写工作,只有在 Channel 有读写事件发生时才会进行读写,从而降低了系统开销,且不必去为每个连接都创建线程,避免多线程之间的上下文切换,从而提高效率
应用
Selector 通过 Selector.open()
方法创建,然后向其进行通道注册,当调用register(Selector sel, int ops)
进行注册时,需要通过第二个参数指定监听事件类型,包括:读
:SelectoionKey.OP_READ (1 << 0)写
:SelectoionKey.OP_WRITE (1 << 2)连接
:SelectoionKey.OP_CONNECT (1 << 3)接收
:SelectoionKey.OP_ACCEPT (1 << 4)
若注册多个监听事件,则可以通过位或操作符连接,如:SelectoionKey.OP_READ | SelectoionKey.OP_WRITE
示例
服务端代码
public class Server {
public static void main(String[] args) {
try (ServerSocketChannel serverChannel = ServerSocketChannel.open()) {
serverChannel.configureBlocking(false);
serverChannel.bind(new InetSocketAddress(8080));
Selector selector = Selector.open();
// 注册监听接收事件
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
// 轮询已经就绪事件
while (selector.select() > 0) {
// 获取所有已就绪事件
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
// 获取事件
SelectionKey key = iterator.next();
if (key.isAcceptable()) {
// 获取当前接入的客户端通道
SocketChannel socketChannel = serverChannel.accept();
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
} else if (key.isReadable()) {
SocketChannel socketChannel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
while (socketChannel.read(buffer) > 0) {
buffer.flip();
System.out.println(new String(buffer.array(), 0, buffer.remaining()));
buffer.clear();
}
}
iterator.remove();
}
}
} catch (IOException e) {
System.out.println("有客户端下线了");
e.printStackTrace();
}
}
}
注意: channel默认是阻塞式,所以*.configureBlocking(false);
一定要写,否则会抛出IllegalBlockingModeException
异常,向选择器注册必须要非阻塞模式
客户端代码
public class Client {
public static void main(String[] args) {
try (SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("localhost", 8080))) {
socketChannel.configureBlocking(false);
ByteBuffer buffer = ByteBuffer.allocate(1024);
Scanner scanner = new Scanner(System.in);
while (true) {
System.out.print("请输入内容:");
buffer.put(scanner.nextLine().getBytes(StandardCharsets.UTF_8));
buffer.flip();
socketChannel.write(buffer);
buffer.clear();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
AIO
Java1.7引入的异步非阻塞IO模型,基于NIO框架构建,进一步简化了异步IO操作的开发。在AIO模型中,应用程序发起IO操作后无需等待结果,操作完成后会通过回调函数通知应用程序,从而能够更加高效地处理大量并发请求
服务端示例
public class Server {
public static void main(String[] args) {
try (AsynchronousServerSocketChannel serverChannel = AsynchronousServerSocketChannel.open()) {
serverChannel.bind(new InetSocketAddress(8080));
serverChannel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() {
@Override
public void completed(AsynchronousSocketChannel result, Object attachment) {
serverChannel.accept(null, this);// 继续接收下一个连接
ByteBuffer buffer = ByteBuffer.allocate(1024);
result.read(buffer, null, new CompletionHandler<Integer, Object>() {
@Override
public void completed(Integer result1, Object attachment) {
buffer.flip();
System.out.println("获取客户端数据:" + new String(buffer.array(), 0, buffer.remaining()));
}
@Override
public void failed(Throwable exc, Object attachment) {
System.out.println("接收客户端数据异常" + exc.getMessage());
}
});
}
@Override
public void failed(Throwable exc, Object attachment) {
System.out.println("接收客户端连接异常" + exc.getMessage());
}
});
System.in.read();
} catch (IOException e) {
System.out.println("服务端启动异常" + e.getMessage());
}
}
}
客户端示例
public class Client {
public static void main(String[] args) {
try (AsynchronousSocketChannel client = AsynchronousSocketChannel.open()) {
Future<Void> future = client.connect(new InetSocketAddress("localhost", 8080));
future.get();
Scanner scanner = new Scanner(System.in);
while (true) {
System.out.print("请输入发送内容:");
client.write(ByteBuffer.wrap(scanner.nextLine().getBytes(StandardCharsets.UTF_8)));
}
} catch (IOException e) {
System.out.println("客户端启动异常" + e.getMessage());
} catch (ExecutionException | InterruptedException e) {
System.out.println("客户端消息异常" + e.getMessage());
}
}
}
- 本文标签: io java
- 本文链接: https://www.58cto.cn/article/49
- 版权声明: 本文由程序言原创发布, 非商业性可自由转载、引用,但需署名作者且注明文章出处:程序言 》 听说你不知道Java并发编程中三种I/O模型?看这一篇就够了 - https://www.58cto.cn/article/49