原创

听说你不知道Java并发编程中三种I/O模型?看这一篇就够了

温馨提示:
本文最后更新于 2024年03月22日,已超过 306 天没有更新。若文章内的图片失效(无法正常加载),请留言反馈或直接联系我

I/O模型基本说明

I/O模型:就是用什么样的通信模式和架构进行数据的传输和接收,很大程度上决定了程序间通信性能。Java中网络编程的I/O模型有:BIO、NIO、AIO,我们可以根据本身业务场景和性能需求来选择不同的I/O模型

BIO

传统的同步阻塞模型,一个客户端连接对应一个单独的线程来处理,如果这个连接不做任何事会造成不必要的线程开销。可以通过线程池机制进行改善

在处理多个客户端请求时,可能会因为线程数量的限制而影响性能,适用于连接数较少的场景,这种模型在并发度低的业务场景中表现良好,例如公司内部的管理系统,请求数量有限

简单示例

网络编程的基本模型是Client/Server模型,即进程间相互通信,其中服务端提供IP地址和端口供客户端连接,双方通过输入和输出流进行通信。示例:实现BIO模式下客户端与服务端通信

public class Server {
    public static void main(String[] args) {
        // 定义 ServerSocket 对象并进行端口注册
        try(ServerSocket ss = new ServerSocket(8080)) {
            while (true) {
                // 监听客户端 Socket 连接请求
                Socket socket = ss.accept();
                new Thread(new ServerThread(socket)).start();
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    public static class ServerThread implements Runnable {
        private Socket socket;

        public ServerThread(Socket socket) {
            this.socket = socket;
        }

        @Override
        public void run() {
            // 获取输入流,接收客户端发送的消息
            try(BufferedReader br = new BufferedReader(new InputStreamReader(socket.getInputStream()))) {
                String msg;
                // 读取流内数据
                while (Objects.nonNull(msg = br.readLine())) {
                    System.out.println(Thread.currentThread().getName() + "服务端接收到消息:" + msg);
                }
            } catch (Exception e) {
                try {
                    this.socket.close();
                } catch (IOException ex) {
                    throw new RuntimeException(ex);
                }
                throw new RuntimeException(e);
            }
        }
    }
}
public class Client {
    public static void main(String[] args) {
        try {
            // 创建 Socket 并与服务端建立连接
            Socket socket = new Socket("localhost", 8080);
            // 从 Socket 对象中获取字符输出流对象
            PrintStream ps = new PrintStream(socket.getOutputStream(), true);
            Scanner scanner = new Scanner(System.in);
            while (true) {
                System.out.print("请输入内容:");
                String context = scanner.nextLine();
                if (Objects.equals("q", context)){
                    socket.shutdownOutput();
                    break;
                } else {
                    ps.println(context);
                }
            }
            ps.close();
            socket.close();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
}

服务端日志

小结:

  • 每接收到一个连接都会创建一个线程,线程的竞争、上下文切换影响性能
  • 每个线程都会占用栈空间和CPU资源
  • 如果客户端没有发送消息,服务端将一直进入阻塞状态占用资源,造成资源浪费
  • 当一端出现异常或断开时,另一端同样会抛出异常信息
  • 客户端并发增加时,服务端需开启对等的线程资源,访问量过大可能会发生线程栈溢出,最终导致系统死机而无法提供服务

改进

我们可以采用线程池和任务队控制服务端线程数量,从而避免因连接数过多导致服务端线程激增的情况。线程池的使用请参照上一篇文章如何正确使用线程池

编写线程池处理类

public class ServerThreadPoolHandler {
    private ExecutorService executorService;

    public ServerThreadPoolHandler(int corePoolSize, int maxPoolSize, int workQueueSize) {
        ThreadFactory factory = r -> new Thread(r,"bio-server-thread-" + System.currentTimeMillis());
        this.executorService = new ThreadPoolExecutor(corePoolSize, maxPoolSize, 10, TimeUnit.MINUTES, new ArrayBlockingQueue<>(workQueueSize), factory);
    }

    public void execute(Runnable runnable) {
        this.executorService.execute(runnable);
    }
}

改造server类main方法

public static void main(String[] args) {
        // 定义 ServerSocket 对象并进行端口注册
        try(ServerSocket ss = new ServerSocket(8080)) {
            ServerThreadPoolHandler handler = new ServerThreadPoolHandler(2, 8, 100);
            while (true) {
                // 监听客户端 Socket 连接请求
                Socket socket = ss.accept();
                ServerThread thread = new ServerThread(socket);
                handler.execute(thread);
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

服务端日志

使用线程池改进虽然避免了为每个请求创建一个独立的线程,但由于底层依然是同步阻塞模型,如果单个消息处理慢或服务器线程池中全部线程都被阻塞,那么后续客户端消息都将在队列中排队,新的请求被拒绝,客户端会发生大量超时情况,因此无法从根本上解决问题。

NIO

Java1.4引入的同步非阻塞IO模型,通过使用Selector机制实现多路复用,支持面向缓冲区、基于通道的IO操作,允许单个线程处理多个连接请求,多路复用器轮询处理当前通道内可处理的数据,若有就进行处理,没有则去处理其它事,提高了系统的并发性能和吞吐量。这种模型适用于高并发、低延迟的场景,例如聊天服务器,弹幕服务器

BIO基于流阻塞式处理数据,而NIO基于通道和缓冲区非阻塞式处理数据,数据从通道内读取到缓冲区中或从缓冲区内写入到通道中,使用Selector监听多个通道事件,因此单个线程就可以监听多个客户端通道,效率比BIO高很多

NIO相关类都被放在java.nio包及其子包下,并且对原java.io包中的很多类进行改写。有三大核心部分:Channel(通道)、Buffer(缓冲区)和Selector(选择器)

Buffer

一个特定基本数据类型的容器,本质是一块可以写入和读取数据的内存。这块内存被包装成NIO Buffer对象,,并提供一组方法,用来方便的访问该块内存,主要用于与BIO通道进行交互。相比较直接对数组操作,Buffer API更加容易操作和管理

Buffer有以下常用子类:ByteBuffer、CharBuffer、ShortBuffer、IntBuffer、LongBuffer、FloatBuffer、DoubleBuffer

重要参数

  • 容量 capacity:Buffer 内存块可用的最大数据量,通常以字节为单位,创建后不能更改
  • 界限 limit:Buffer 可以读或写的最大数据量。对于写模式,limit 通常和 capacity 相等,表示可以写入的最大数据量;对于读模式,limit 标记了可以安全从 Buffer 中读取的数据量
  • 位置 position:下一次读或写的位置,每次读写数据后,位置都会相应的前或后移动
  • 标记 mark:标记 Buffer 中的一个特定位置,以便后续操作可以使用 reset 快速定位到此位置
  • 重置 reset:与 mark 配合使用快速定位到标记位置

0 <= mark <= position <= limit <= capacity

常见方法

  • allocate():用于创建一个指定容量的 Buffer 实例
  • put():将数据写入到 Buffer,通常在写入模式下进行操作
  • flip():读取数据前调用,Buffer 从写模式切换到读模式,并准备开始读数据
  • get():从 Buffer 中读数据,通常在读模式下操作
  • clean():清空 Buffer,重置 position、limit 和 mark 等参数并返回对缓冲区的引用以便再次被写入数据
  • capacity(): 返回 Buffer 的大小
  • hasRemaining():判断 Buffer 中是否还有元素
  • limit():返回 Buffer 的界限位置
  • limit(int n):设置 Buffer 界限为n,并返回具有新 limit 的 Buffer 对象
  • mark():对 Buffer 设置标记,以便稍后可以通过 reset() 方法恢复到这个位置
  • position():返回 Buffer 的当前位置
  • position(int n):设置 Buffer 的当前位置为n,即下一次读写操作的起始位置。n必须是有效索引值
  • remaining():返回 position 和 limit 之间的元素个数
  • reset():将位置 position 重置到 mark 所在位置
  • rewind():将 position 重置为0,limit 设置为原来 limit 值,取消 mark 标记,可用于重新读取数据
  • compact():清除 Buffer 中已经读取的数据,将未读数据移动到 Buffer 开始位置,接着写入位置紧随未读数据之后

直接与间接缓冲区

NIO中ByteBuffer有直接缓冲区和间接内存两种类型,他们内存分配和性能特定有所不同:

  • 直接缓冲区:直接缓冲区是在Java虚拟机(JVM)之外的物理内存中开辟的一块内存区域,可以通过allocateDirect创建,使用isDirect()判断。这意味着当进行I/O操作时,数据可以直接在这个内存区域和操作系统之间传输,避免了在Java堆内存和native内存之间的额外复制过程。这种方式可以极大地提高读写效率,特别是在涉及到文件映射或者网络I/O时。由于直接缓冲区是在JVM外的物理内存中分配的,它们的创建和销毁开销较大,且占用的内存只能通过Java的垃圾回收机制来释放

  • 间接缓冲区:间接缓冲区是在Java虚拟机的堆内存中创建的。相比于直接缓冲区,间接缓冲区的创建和销毁成本较小,易于回收。但是,在进行I/O操作时,数据需要在Java堆内存和操作系统之间进行复制,这可能会降低性能,尤其是在大量数据处理时

总的来说,直接缓冲区提供了更高的I/O效率,适合性能敏感的场景,而间接缓冲区则在内存管理上更为便捷,适用于对性能要求不是特别高的情况。在实际使用时,需要根据具体的应用场景和性能需求来选择合适的缓冲区类型

Channel

表示IO源与目标打开的连接,具有的流数据读写能力,本身并不直接访问数据只与 Buffer 交互,与流不同的是流的读写通常是单向的,通道可以非阻塞和异步地读取和写入通道数据

常见类

  • FileChannel:用于读写文件的通道,可对文件进行高效的随机访问
  • SocketChannel:用于TCP网络通信,提供了客户端和服务期间的数据传输
  • ServerSocketChannel:用于服务器端,监听新的TCP连接请求
  • DatagramChannel:用于UDP协议的网络通信
  • MulticastChannel:用于IP多播通信,允许一个发送者和多个接收者之间数据交互
  • ScatteringByteChannel:用于散射数据的读写操作
  • GatheringByteChannel:用于聚集数据的读写操作

Selector

Selector 是 SelectableChannel 对象的多路复用器,可以对多个通道状态进行监控,以判断通道是否准备好对数据的读写工作,只有在 Channel 有读写事件发生时才会进行读写,从而降低了系统开销,且不必去为每个连接都创建线程,避免多线程之间的上下文切换,从而提高效率

应用

Selector 通过 Selector.open() 方法创建,然后向其进行通道注册,当调用register(Selector sel, int ops)进行注册时,需要通过第二个参数指定监听事件类型,包括:
:SelectoionKey.OP_READ (1 << 0)
:SelectoionKey.OP_WRITE (1 << 2)
连接:SelectoionKey.OP_CONNECT (1 << 3)
接收:SelectoionKey.OP_ACCEPT (1 << 4)
若注册多个监听事件,则可以通过位或操作符连接,如:SelectoionKey.OP_READ | SelectoionKey.OP_WRITE

示例

服务端代码
public class Server {
    public static void main(String[] args) {
        try (ServerSocketChannel serverChannel = ServerSocketChannel.open()) {
            serverChannel.configureBlocking(false);
            serverChannel.bind(new InetSocketAddress(8080));
            Selector selector = Selector.open();
            // 注册监听接收事件
            serverChannel.register(selector, SelectionKey.OP_ACCEPT);
            // 轮询已经就绪事件
            while (selector.select() > 0) {
                // 获取所有已就绪事件
                Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                while (iterator.hasNext()) {
                    // 获取事件
                    SelectionKey key = iterator.next();
                    if (key.isAcceptable()) {
                        // 获取当前接入的客户端通道
                        SocketChannel socketChannel = serverChannel.accept();
                        socketChannel.configureBlocking(false);
                        socketChannel.register(selector, SelectionKey.OP_READ);
                    } else if (key.isReadable()) {
                        SocketChannel socketChannel = (SocketChannel) key.channel();
                        ByteBuffer buffer = ByteBuffer.allocate(1024);
                        while (socketChannel.read(buffer) > 0) {
                            buffer.flip();
                            System.out.println(new String(buffer.array(), 0, buffer.remaining()));
                            buffer.clear();
                        }
                    }
                    iterator.remove();
                }
            }
        } catch (IOException e) {
            System.out.println("有客户端下线了");
            e.printStackTrace();
        }
    }
}

注意: channel默认是阻塞式,所以
*.configureBlocking(false);一定要写,否则会抛出IllegalBlockingModeException异常,向选择器注册必须要非阻塞模式
异常

客户端代码
public class Client {
    public static void main(String[] args) {
        try (SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("localhost", 8080))) {
            socketChannel.configureBlocking(false);
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            Scanner scanner = new Scanner(System.in);
            while (true) {
                System.out.print("请输入内容:");
                buffer.put(scanner.nextLine().getBytes(StandardCharsets.UTF_8));
                buffer.flip();
                socketChannel.write(buffer);
                buffer.clear();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

AIO

Java1.7引入的异步非阻塞IO模型,基于NIO框架构建,进一步简化了异步IO操作的开发。在AIO模型中,应用程序发起IO操作后无需等待结果,操作完成后会通过回调函数通知应用程序,从而能够更加高效地处理大量并发请求

服务端示例

public class Server {
    public static void main(String[] args) {
        try (AsynchronousServerSocketChannel serverChannel = AsynchronousServerSocketChannel.open()) {
            serverChannel.bind(new InetSocketAddress(8080));
            serverChannel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() {
                @Override
                public void completed(AsynchronousSocketChannel result, Object attachment) {
                    serverChannel.accept(null, this);// 继续接收下一个连接
                    ByteBuffer buffer = ByteBuffer.allocate(1024);
                    result.read(buffer, null, new CompletionHandler<Integer, Object>() {
                        @Override
                        public void completed(Integer result1, Object attachment) {
                            buffer.flip();
                            System.out.println("获取客户端数据:" + new String(buffer.array(), 0, buffer.remaining()));
                        }

                        @Override
                        public void failed(Throwable exc, Object attachment) {
                            System.out.println("接收客户端数据异常" + exc.getMessage());
                        }
                    });
                }
                @Override
                public void failed(Throwable exc, Object attachment) {
                    System.out.println("接收客户端连接异常" + exc.getMessage());
                }
            });
            System.in.read();
        } catch (IOException e) {
            System.out.println("服务端启动异常" + e.getMessage());
        }
    }
}

客户端示例

public class Client {
    public static void main(String[] args) {
        try (AsynchronousSocketChannel client = AsynchronousSocketChannel.open()) {
            Future<Void> future = client.connect(new InetSocketAddress("localhost", 8080));
            future.get();
            Scanner scanner = new Scanner(System.in);

            while (true) {
                System.out.print("请输入发送内容:");
                client.write(ByteBuffer.wrap(scanner.nextLine().getBytes(StandardCharsets.UTF_8)));
            }
        } catch (IOException e) {
            System.out.println("客户端启动异常" + e.getMessage());
        } catch (ExecutionException | InterruptedException e) {
            System.out.println("客户端消息异常" + e.getMessage());
        }
    }
}
正文到此结束
本文目录