激情久久久_欧美视频区_成人av免费_不卡视频一二三区_欧美精品在欧美一区二区少妇_欧美一区二区三区的

服務(wù)器之家:專注于服務(wù)器技術(shù)及軟件下載分享
分類導(dǎo)航

PHP教程|ASP.NET教程|Java教程|ASP教程|編程技術(shù)|正則表達(dá)式|C/C++|IOS|C#|Swift|Android|VB|R語(yǔ)言|JavaScript|易語(yǔ)言|vb.net|

服務(wù)器之家 - 編程語(yǔ)言 - Java教程 - 從操作系統(tǒng)層面分析Java IO演進(jìn)之路

從操作系統(tǒng)層面分析Java IO演進(jìn)之路

2021-07-01 01:04阿里技術(shù) Java教程

本文從操作系統(tǒng)實(shí)際調(diào)用角度(以CentOS Linux release 7.5操作系統(tǒng)為示例),力求追根溯源看IO的每一步操作到底發(fā)生了什么。

從操作系統(tǒng)層面分析Java IO演進(jìn)之路

 前言

 

本文從操作系統(tǒng)實(shí)際調(diào)用角度(以CentOS Linux release 7.5操作系統(tǒng)為示例),力求追根溯源看IO的每一步操作到底發(fā)生了什么。

關(guān)于如何查看系統(tǒng)調(diào)用,Linux可以使用 strace 來(lái)查看任何軟件的系統(tǒng)調(diào)動(dòng)(這是個(gè)很好的分析學(xué)習(xí)方法):strace -ff -o ./out java TestJava

一 BIO

 

  1. /** 
  2.  * Alipay.com Inc. Copyright (c) 2004-2020 All Rights Reserved. 
  3.  */ 
  4. package io;  
  5.  
  6. import java.io.*; 
  7. import java.net.ServerSocket; 
  8. import java.net.Socket; 
  9.  
  10. /** 
  11.  * @author xiangyong.ding 
  12.  * @version $Id: TestSocket.java, v 0.1 2020年08月02日 20:56 xiangyong.ding Exp $ 
  13.  */ 
  14. public class BIOSocket { 
  15.     public static void main(String[] args) throws IOException { 
  16.         ServerSocket serverSocket = new ServerSocket(8090); 
  17.         System.out.println("step1: new ServerSocket "); 
  18.         while (true) { 
  19.             Socket client = serverSocket.accept(); 
  20.             System.out.println("step2: client\t" + client.getPort()); 
  21.             new Thread(() -> { 
  22.                 try { 
  23.                     InputStream in = client.getInputStream(); 
  24.                     BufferedReader reader = new BufferedReader(new InputStreamReader(in)); 
  25.                     while (true) { 
  26.                         System.out.println(reader.readLine()); 
  27.                     } 
  28.                 } catch (IOException e) { 
  29.                     e.printStackTrace(); 
  30.                 } 
  31.             }).start(); 
  32.         } 
  33.     } 

1 發(fā)生的系統(tǒng)調(diào)用

啟動(dòng)時(shí)

  1. socket(AF_INET, SOCK_STREAM, IPPROTO_IP) = 5 
  2. bind(5, {sa_family=AF_INET, sin_port=htons(8090), sin_addr=inet_addr("0.0.0.0")}, 16) = 0 
  3. listen(5, 50)                           = 0 
  4. poll([{fd=5, events=POLLIN|POLLERR}], 1, -1) = 1 ([{fd=5, revents=POLLIN}]) 

poll函數(shù)會(huì)阻塞直到其中任何一個(gè)fd發(fā)生事件。

有客戶端連接后

  1. accept(5, {sa_family=AF_INET, sin_port=htons(10253), sin_addr=inet_addr("42.120.74.252")}, [16]) = 6 
  2. clone(child_stack=0x7f013e5c4fb0, flags=CLONE_VM|CLONE_FS|CLONE_FILES|CLONE_SIGHAND|CLONE_THREAD|CLONE_SYSVSEM|CLONE_SETTLS|CLONE_PARENT_SETTID|CLONE_CHILD_CLEARTID, parent_tidptr=0x7f013e5c59d0,         tls=0x7f013e5c5700, child_tidptr=0x7f013e5c59d0) = 13168 
  3. poll([{fd=5, events=POLLIN|POLLERR}], 1, -1 

拋出線程(即我們代碼里的 new Thread() )后,繼續(xù)poll阻塞等待連接。

clone出來(lái)的線程

  1. recvfrom(6, "hello,bio\n", 8192, 0, NULLNULL) = 

關(guān)于對(duì)recvfrom函數(shù)的說(shuō)明,其中第四個(gè)參數(shù)0 表示這是一個(gè)阻塞調(diào)用。

客戶端發(fā)送數(shù)據(jù)后

  1. recvfrom(6, "hello,bio\n", 8192, 0, NULLNULL) = 10 

2 優(yōu)缺點(diǎn)

優(yōu)點(diǎn)

代碼簡(jiǎn)單,邏輯清晰。

缺點(diǎn)

  • 由于stream的read操作是阻塞讀,面對(duì)多個(gè)連接時(shí) 每個(gè)連接需要每線程。無(wú)法處理大量連接(C10K問(wèn)題)。
  • 誤區(qū):可見(jiàn)JDK1.8中對(duì)于最初的BIO,在Linux OS下仍然使用的poll,poll本身也是相對(duì)比較高效的多路復(fù)用函數(shù)(支持非阻塞、多個(gè)socket同時(shí)檢查event),只是限于JDK最初的stream API限制,無(wú)法支持非阻塞讀取。

二 NIO(non block)

 

改進(jìn):使用NIO API,將阻塞變?yōu)榉亲枞?不需要大量線程。

  1. /** 
  2.  * Alipay.com Inc. Copyright (c) 2004-2020 All Rights Reserved. 
  3.  */ 
  4. package io; 
  5.  
  6. import java.io.IOException; 
  7. import java.net.InetSocketAddress; 
  8. import java.nio.ByteBuffer; 
  9. import java.nio.channels.ServerSocketChannel; 
  10. import java.nio.channels.SocketChannel; 
  11. import java.util.LinkedList; 
  12.  
  13. /** 
  14.  * @author xiangyong.ding 
  15.  * @version $Id: NioSocket.java, v 0.1 2020年08月09日 11:25 xiangyong.ding Exp $ 
  16.  */ 
  17. public class NIOSocket { 
  18.     private static LinkedList<SocketChannel> clients = new LinkedList<>(); 
  19.  
  20.     private static void startClientChannelHandleThread(){ 
  21.         new Thread(() -> { 
  22.             while (true){ 
  23.                 ByteBuffer buffer = ByteBuffer.allocateDirect(4096); 
  24.  
  25.                 //處理客戶端連接 
  26.                 for (SocketChannel c : clients) { 
  27.                     // 非阻塞, >0 表示讀取到的字節(jié)數(shù)量, 0或-1表示未讀取到或讀取異常 
  28.                     int num = 0; 
  29.                     try { 
  30.                         num = c.read(buffer); 
  31.                     } catch (IOException e) { 
  32.                         e.printStackTrace(); 
  33.                     } 
  34.  
  35.                     if (num > 0) { 
  36.                         buffer.flip(); 
  37.                         byte[] clientBytes = new byte[buffer.limit()]; 
  38.                         //從緩沖區(qū) 讀取到內(nèi)存中 
  39.                         buffer.get(clientBytes); 
  40.  
  41.                         System.out.println(c.socket().getPort() + ":" + new String(clientBytes)); 
  42.  
  43.                         //清空緩沖區(qū) 
  44.                         buffer.clear(); 
  45.                     } 
  46.                 } 
  47.             } 
  48.         }).start(); 
  49.     } 
  50.  
  51.     public static void main(String[] args) throws IOException { 
  52.         //new socket,開(kāi)啟監(jiān)聽(tīng) 
  53.         ServerSocketChannel socketChannel = ServerSocketChannel.open(); 
  54.         socketChannel.bind(new InetSocketAddress(9090)); 
  55.         //設(shè)置阻塞接受客戶端連接 
  56.         socketChannel.configureBlocking(true); 
  57.  
  58.         //開(kāi)始client處理線程 
  59.         startClientChannelHandleThread(); 
  60.  
  61.         while (true) { 
  62.             //接受客戶端連接; 非阻塞,無(wú)客戶端返回null(操作系統(tǒng)返回-1) 
  63.             SocketChannel client = socketChannel.accept(); 
  64.  
  65.             if (client == null) { 
  66.                 //System.out.println("no client"); 
  67.             } else { 
  68.                 //設(shè)置讀非阻塞 
  69.                 client.configureBlocking(false); 
  70.  
  71.                 int port = client.socket().getPort(); 
  72.                 System.out.println("client port :" + port); 
  73.  
  74.                 clients.add(client); 
  75.             } 
  76.         } 
  77.     } 

1 發(fā)生的系統(tǒng)調(diào)用

主線程

  1. socket(AF_INET, SOCK_STREAM, IPPROTO_IP) = 4 
  2. bind(4, {sa_family=AF_INET, sin_port=htons(9090), sin_addr=inet_addr("0.0.0.0")}, 16) = 0 
  3. listen(4, 50)                           = 0 
  4. fcntl(4, F_SETFL, O_RDWR|O_NONBLOCK)    = 0 
  5. accept(4, 0x7fe26414e680, 0x7fe26c376710) = -1 EAGAIN (Resource temporarily unavailable) 

有連接后,子線程

  1. read(6, 0x7f3f415b1c50, 4096)           = -1 EAGAIN (Resource temporarily unavailable) 
  2. read(6, 0x7f3f415b1c50, 4096)           = -1 EAGAIN (Resource temporarily unavailable) 
  3. ... 

資源使用情況:

從操作系統(tǒng)層面分析Java IO演進(jìn)之路

2 優(yōu)缺點(diǎn)

優(yōu)點(diǎn)

線程數(shù)大大減少。

缺點(diǎn)

需要程序自己掃描 每個(gè)連接read,需要 O(n)時(shí)間復(fù)雜度系統(tǒng)調(diào)用 (此時(shí)可能只有一個(gè)連接發(fā)送了數(shù)據(jù)),高頻系統(tǒng)調(diào)用(導(dǎo)致CPU 用戶態(tài)內(nèi)核態(tài)切換)高。導(dǎo)致CPU消耗很高。

三 多路復(fù)用器(select、poll、epoll)

 

改進(jìn):不需要用戶掃描所有連接,由kernel 給出哪些連接有數(shù)據(jù),然后應(yīng)用從有數(shù)據(jù)的連接讀取數(shù)據(jù)。

1 epoll

  1. import java.net.InetSocketAddress; 
  2. import java.nio.ByteBuffer; 
  3. import java.nio.channels.SelectionKey; 
  4. import java.nio.channels.Selector; 
  5. import java.nio.channels.ServerSocketChannel; 
  6. import java.nio.channels.SocketChannel; 
  7. import java.util.Iterator; 
  8. import java.util.LinkedList; 
  9. import java.util.Set
  10.  
  11. /** 
  12.  * 多路復(fù)用socket 
  13.  * 
  14.  * @author xiangyong.ding 
  15.  * @version $Id: MultiplexingSocket.java, v 0.1 2020年08月09日 12:19 xiangyong.ding Exp $ 
  16.  */ 
  17. public class MultiplexingSocket { 
  18.  
  19.     static ByteBuffer buffer = ByteBuffer.allocateDirect(4096); 
  20.  
  21.     public static void main(String[] args) throws Exception { 
  22.  
  23.         LinkedList<SocketChannel> clients = new LinkedList<>(); 
  24.  
  25.         //1.啟動(dòng)server 
  26.         //new socket,開(kāi)啟監(jiān)聽(tīng) 
  27.         ServerSocketChannel socketChannel = ServerSocketChannel.open(); 
  28.         socketChannel.bind(new InetSocketAddress(9090)); 
  29.         //設(shè)置非阻塞,接受客戶端 
  30.         socketChannel.configureBlocking(false); 
  31.  
  32.         //多路復(fù)用器(JDK包裝的代理,select /poll/epoll/kqueue) 
  33.         Selector selector = Selector.open(); //java自動(dòng)代理,默認(rèn)為epoll 
  34.         //Selector selector = PollSelectorProvider.provider().openSelector();//指定為poll 
  35.  
  36.         //將服務(wù)端socket 注冊(cè)到 多路復(fù)用器 
  37.         socketChannel.register(selector, SelectionKey.OP_ACCEPT); 
  38.  
  39.         //2. 輪訓(xùn)多路復(fù)用器 
  40.         // 先詢問(wèn)有沒(méi)有連接,如果有則返回?cái)?shù)量以及對(duì)應(yīng)的對(duì)象(fd) 
  41.         while (selector.select() > 0) { 
  42.             System.out.println(); 
  43.             Set<SelectionKey> selectionKeys = selector.selectedKeys(); 
  44.             Iterator<SelectionKey> iter = selectionKeys.iterator(); 
  45.  
  46.             while (iter.hasNext()) { 
  47.                 SelectionKey key = iter.next(); 
  48.                 iter.remove(); 
  49.  
  50.                 //2.1 處理新的連接 
  51.                 if (key.isAcceptable()) { 
  52.                     //接受客戶端連接; 非阻塞,無(wú)客戶端返回null(操作系統(tǒng)返回-1) 
  53.                     SocketChannel client = socketChannel.accept(); 
  54.                     //設(shè)置讀非阻塞 
  55.                     client.configureBlocking(false); 
  56.  
  57.                     //同樣,把client也注冊(cè)到selector 
  58.                     client.register(selector, SelectionKey.OP_READ); 
  59.                     System.out.println("new client : " + client.getRemoteAddress()); 
  60.                 } 
  61.                 //2.2 處理讀取數(shù)據(jù) 
  62.                 else if (key.isReadable()) { 
  63.                     readDataFromSocket(key); 
  64.                 } 
  65.             } 
  66.         } 
  67.     } 
  68.  
  69.     protected static void readDataFromSocket(SelectionKey key) throws Exception { 
  70.         SocketChannel socketChannel = (SocketChannel) key.channel(); 
  71.         // 非阻塞, >0 表示讀取到的字節(jié)數(shù)量, 0或-1表示未讀取到或讀取異常 
  72.         // 請(qǐng)注意:這個(gè)例子降低復(fù)雜度,不考慮報(bào)文大于buffer size的情況 
  73.         int num = socketChannel.read(buffer); 
  74.  
  75.         if (num > 0) { 
  76.             buffer.flip(); 
  77.             byte[] clientBytes = new byte[buffer.limit()]; 
  78.             //從緩沖區(qū) 讀取到內(nèi)存中 
  79.             buffer.get(clientBytes); 
  80.  
  81.             System.out.println(socketChannel.socket().getPort() + ":" + new String(clientBytes)); 
  82.  
  83.             //清空緩沖區(qū) 
  84.             buffer.clear(); 
  85.         } 
  86.     } 

2 發(fā)生的系統(tǒng)調(diào)用

啟動(dòng)

  1. socket(AF_INET, SOCK_STREAM, IPPROTO_IP) = 4 
  2. bind(4, {sa_family=AF_INET, sin_port=htons(9090), sin_addr=inet_addr("0.0.0.0")}, 16) = 0 
  3. listen(4, 50) 
  4. fcntl(4, F_SETFL, O_RDWR|O_NONBLOCK)    = 0 
  5. epoll_create(256)                       = 7 
  6. epoll_ctl(7, EPOLL_CTL_ADD, 5, {EPOLLIN, {u32=5, u64=4324783852322029573}}) = 0 
  7. epoll_ctl(7, EPOLL_CTL_ADD, 4, {EPOLLIN, {u32=4, u64=158913789956}}) = 0 
  8. epoll_wait(7 

關(guān)于對(duì)epoll_create(對(duì)應(yīng)著Java的 Selector selector = Selector.open()) 的說(shuō)明,本質(zhì)上是在內(nèi)存的操作系統(tǒng)保留區(qū),創(chuàng)建一個(gè)epoll數(shù)據(jù)結(jié)構(gòu)。用于后面當(dāng)有client連接時(shí),向該epoll區(qū)中添加監(jiān)聽(tīng)。

有連接

  1. epoll_wait(7,[{EPOLLIN, {u32=4, u64=158913789956}}], 8192, -1) = 1 
  2. accept(4, {sa_family=AF_INET, sin_port=htons(29597), sin_addr=inet_addr("42.120.74.252")}, [16]) = 8 
  3. fcntl(8, F_SETFL, O_RDWR|O_NONBLOCK)    = 0 
  4. epoll_ctl(7, EPOLL_CTL_ADD, 8, {EPOLLIN, {u32=8, u64=3212844375897800712}}) = 0 

關(guān)于epoll_ctl (對(duì)應(yīng)著Java的 client.register(selector, SelectionKey.OP_READ) )。其中 EPOLLIN 恰好對(duì)應(yīng)著Java的 SelectionKey.OP_READ 即監(jiān)聽(tīng)數(shù)據(jù)到達(dá)讀取事件。

客戶端發(fā)送數(shù)據(jù)

  1. epoll_wait(7,[{EPOLLIN, {u32=8, u64=3212844375897800712}}], 8192, -1) = 1 
  2. read(8, "hello,multiplex\n", 4096)      = 16 
  3. epoll_wait(7, 

note:epoll_wait第四個(gè)參數(shù)-1表示block。

poll 和 epoll 對(duì)比

根據(jù)“1.BIO”中的poll函數(shù)調(diào)用和epoll函數(shù)對(duì)比如下:

從操作系統(tǒng)層面分析Java IO演進(jìn)之路

從操作系統(tǒng)層面分析Java IO演進(jìn)之路

poll和epoll本質(zhì)上都是同步IO, 區(qū)別于BIO的是 多路復(fù)用充分降低了 system call,而epoll更進(jìn)一步,再次降低了system call的時(shí)間復(fù)雜度。

3 優(yōu)缺點(diǎn)

優(yōu)點(diǎn)

  • 線程數(shù)同樣很少,甚至可以把a(bǔ)cceptor線程和worker線程使用同一個(gè)。
  • 時(shí)間復(fù)雜度低,Java實(shí)現(xiàn)的Selector(在Linux OS下使用的epoll函數(shù))支持多個(gè)clientChannel事件的一次性獲取,且時(shí)間復(fù)雜度維持在O(1)。
  • CPU使用低:得益于Selector,我們不用向 “2.NIO”中需要自己一個(gè)個(gè)ClientChannel手動(dòng)去檢查事件,因此使得CPU使用率大大降低。

缺點(diǎn)

  • 數(shù)據(jù)處理麻煩:目前socketChannel.read 讀取數(shù)據(jù)完全是基于字節(jié)的,當(dāng)我們需要需要作為HTTP服務(wù)網(wǎng)關(guān)時(shí),對(duì)于HTTP協(xié)議的處理完全需要自己解析,這是個(gè)龐大、煩雜、容易出錯(cuò)的工作。
  • 性能
    • 現(xiàn)有socket數(shù)據(jù)的讀取(socketChannel.read(buffer))全部通過(guò)一個(gè)buffer 緩沖區(qū)來(lái)接受,一旦連接多起來(lái),這無(wú)疑是一個(gè)單線程讀取,性能無(wú)疑是個(gè)問(wèn)題。
    • 那么此時(shí)buffer我們每次讀取都重新new出來(lái)呢?如果每次都new出來(lái),這樣的內(nèi)存碎片對(duì)于GC無(wú)疑是一場(chǎng)災(zāi)難。如何平衡地協(xié)調(diào)好buffer的共享,既保證性能,又保證線程安全,這是個(gè)難題。

四 Netty

 

1 研究的目標(biāo)源碼(netty提供的入門example)

TelnetServer

  1. package telnet; 
  2.  
  3. import io.netty.bootstrap.ServerBootstrap; 
  4. import io.netty.channel.EventLoopGroup; 
  5. import io.netty.channel.nio.NioEventLoopGroup; 
  6. import io.netty.channel.socket.nio.NioServerSocketChannel; 
  7. import io.netty.handler.logging.LogLevel; 
  8. import io.netty.handler.logging.LoggingHandler; 
  9. import io.netty.handler.ssl.SslContext; 
  10. import io.netty.handler.ssl.SslContextBuilder; 
  11. import io.netty.handler.ssl.util.SelfSignedCertificate; 
  12.  
  13. /** 
  14.  * Simplistic telnet server. 
  15.  */ 
  16. public final class TelnetServer { 
  17.  
  18.     static final boolean SSL = System.getProperty("ssl") != null
  19.     static final int PORT = Integer.parseInt(System.getProperty("port", SSL? "8992" : "8023")); 
  20.  
  21.     public static void main(String[] args) throws Exception { 
  22.         // Configure SSL. 
  23.         final SslContext sslCtx; 
  24.         if (SSL) { 
  25.             SelfSignedCertificate ssc = new SelfSignedCertificate(); 
  26.             sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build(); 
  27.         } else { 
  28.             sslCtx = null
  29.         } 
  30.  
  31.         EventLoopGroup bossGroup = new NioEventLoopGroup(1); 
  32.         EventLoopGroup workerGroup = new NioEventLoopGroup(); 
  33.         try { 
  34.             ServerBootstrap b = new ServerBootstrap(); 
  35.             b.group(bossGroup, workerGroup) 
  36.              .channel(NioServerSocketChannel.class) 
  37.              .handler(new LoggingHandler(LogLevel.INFO)) 
  38.              .childHandler(new TelnetServerInitializer(sslCtx)); 
  39.  
  40.             b.bind(PORT).sync().channel().closeFuture().sync(); 
  41.         } finally { 
  42.             bossGroup.shutdownGracefully(); 
  43.             workerGroup.shutdownGracefully(); 
  44.         } 
  45.     } 

TelnetServerHandler

  1. package telnet; 
  2.  
  3. import io.netty.channel.ChannelFuture; 
  4. import io.netty.channel.ChannelFutureListener; 
  5. import io.netty.channel.ChannelHandler.Sharable; 
  6. import io.netty.channel.ChannelHandlerContext; 
  7. import io.netty.channel.SimpleChannelInboundHandler; 
  8.  
  9. import java.net.InetAddress; 
  10. import java.util.Date
  11.  
  12. /** 
  13.  * Handles a server-side channel. 
  14.  */ 
  15. @Sharable 
  16. public class TelnetServerHandler extends SimpleChannelInboundHandler<String> { 
  17.  
  18.     @Override 
  19.     public void channelActive(ChannelHandlerContext ctx) throws Exception { 
  20.         // Send greeting for a new connection
  21.         ctx.write("Welcome to " + InetAddress.getLocalHost().getHostName() + "!\r\n"); 
  22.         ctx.write("It is " + new Date() + " now.\r\n"); 
  23.         ctx.flush(); 
  24.     } 
  25.  
  26.     @Override 
  27.     public void channelRead0(ChannelHandlerContext ctx, String request) throws Exception { 
  28.         // Generate and write a response. 
  29.         String response; 
  30.         boolean close = false
  31.         if (request.isEmpty()) { 
  32.             response = "Please type something.\r\n"
  33.         } else if ("bye".equals(request.toLowerCase())) { 
  34.             response = "Have a good day!\r\n"
  35.             close = true
  36.         } else { 
  37.             response = "Did you say '" + request + "'?\r\n"
  38.         } 
  39.  
  40.         // We do not need to write a ChannelBuffer here. 
  41.         // We know the encoder inserted at TelnetPipelineFactory will do the conversion. 
  42.         ChannelFuture future = ctx.write(response); 
  43.  
  44.         // Close the connection after sending 'Have a good day!' 
  45.         // if the client has sent 'bye'
  46.         if (close) { 
  47.             future.addListener(ChannelFutureListener.CLOSE); 
  48.         } 
  49.     } 
  50.  
  51.     @Override 
  52.     public void channelReadComplete(ChannelHandlerContext ctx) { 
  53.         ctx.flush(); 
  54.     } 
  55.  
  56.     @Override 
  57.     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { 
  58.         cause.printStackTrace(); 
  59.         ctx.close(); 
  60.     } 

TelnetServerInitializer

  1. package telnet; 
  2.  
  3. import io.netty.channel.ChannelInitializer; 
  4. import io.netty.channel.ChannelPipeline; 
  5. import io.netty.channel.socket.SocketChannel; 
  6. import io.netty.handler.codec.DelimiterBasedFrameDecoder; 
  7. import io.netty.handler.codec.Delimiters; 
  8. import io.netty.handler.codec.string.StringDecoder; 
  9. import io.netty.handler.codec.string.StringEncoder; 
  10. import io.netty.handler.ssl.SslContext; 
  11.  
  12. /** 
  13.  * Creates a newly configured {@link ChannelPipeline} for a new channel. 
  14.  */ 
  15. public class TelnetServerInitializer extends ChannelInitializer<SocketChannel> { 
  16.  
  17.     private static final StringDecoder DECODER = new StringDecoder(); 
  18.     private static final StringEncoder ENCODER = new StringEncoder(); 
  19.  
  20.     private static final TelnetServerHandler SERVER_HANDLER = new TelnetServerHandler(); 
  21.  
  22.     private final SslContext sslCtx; 
  23.  
  24.     public TelnetServerInitializer(SslContext sslCtx) { 
  25.         this.sslCtx = sslCtx; 
  26.     } 
  27.  
  28.     @Override 
  29.     public void initChannel(SocketChannel ch) throws Exception { 
  30.         ChannelPipeline pipeline = ch.pipeline(); 
  31.  
  32.         if (sslCtx != null) { 
  33.             pipeline.addLast(sslCtx.newHandler(ch.alloc())); 
  34.         } 
  35.  
  36.         // Add the text line codec combination first
  37.         pipeline.addLast(new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter())); 
  38.         // the encoder and decoder are static as these are sharable 
  39.         pipeline.addLast(DECODER); 
  40.         pipeline.addLast(ENCODER); 
  41.  
  42.         // and then business logic. 
  43.         pipeline.addLast(SERVER_HANDLER); 
  44.     } 

2 啟動(dòng)后的系統(tǒng)調(diào)用

主線程(23109)

  1. ## 256無(wú)實(shí)際作用,這里只為了兼容舊版kernel api 
  2. epoll_create(256)                       = 7epoll_ctl(7, EPOLL_CTL_ADD, 5, {EPOLLIN, {u32=5, u64=5477705356928876549}}) = 0 
  3.  
  4. epoll_create(256)                       = 10epoll_ctl(10, EPOLL_CTL_ADD, 8, {EPOLLIN, {u32=8, u64=17041805914081853448}}) = 0 
  5.  
  6. epoll_create(256)                       = 13 
  7. epoll_ctl(13, EPOLL_CTL_ADD, 11, {EPOLLIN, {u32=11, u64=17042151607409573899}}) = 0 
  8.  
  9. epoll_create(256)                       = 16 
  10. epoll_ctl(16, EPOLL_CTL_ADD, 14, {EPOLLIN, {u32=14, u64=17042497300737294350}}) = 0 
  11.  
  12. epoll_create(256)                       = 19 
  13. epoll_ctl(19, EPOLL_CTL_ADD, 17, {EPOLLIN, {u32=17, u64=17042561450368827409}}) = 0 
  14.  
  15. epoll_create(256)                       = 10 
  16. socket(AF_INET, SOCK_STREAM, IPPROTO_IP) = 20 
  17. clone(child_stack=0x7fc3c509afb0, flags=CLONE_VM|CLONE_FS|CLONE_FILES|CLONE_SIGHAND|CLONE_THREAD|CLONE_SYSVSEM|CLONE_SETTLS|CLONE_PARENT_SETTID|CLONE_CHILD_CLEARTID, parent_tidptr=0x7fc3c509b9d0, tls=0x7fc3c509b700, child_tidptr=0x7fc3c509b9d0) = 23130 

概括為:

  • 向OS新建socket,并開(kāi)啟clone boss線程23130。
  • 為BOSS創(chuàng)建了一個(gè)epoll(論證參見(jiàn)下面“boss”),每個(gè)worker創(chuàng)建一個(gè)epoll數(shù)據(jù)結(jié)構(gòu)(本質(zhì)上是在kernel內(nèi)存區(qū)創(chuàng)建了一個(gè)數(shù)據(jù)結(jié)構(gòu),用于后續(xù)監(jiān)聽(tīng))。
  • 創(chuàng)建boss線程監(jiān)聽(tīng)的socket(本質(zhì)上在kernel中創(chuàng)建一個(gè)數(shù)據(jù)結(jié)構(gòu))。

boss(23130)

  1. bind(20, {sa_family=AF_INET, sin_port=htons(8023), sin_addr=inet_addr("0.0.0.0")}, 16) = 0 
  2. listen(20, 128)                         = 0 
  3. getsockname(20, {sa_family=AF_INET, sin_port=htons(8023), sin_addr=inet_addr("0.0.0.0")}, [16]) = 0 
  4. getsockname(20, {sa_family=AF_INET, sin_port=htons(8023), sin_addr=inet_addr("0.0.0.0")}, [16]) = 0  
  5.  
  6. ##將fd為7號(hào)epoll和fd為20號(hào)的socket綁定,事件:epoll_ctl_add和epoll_ctl_mod 
  7. epoll_ctl(7, EPOLL_CTL_ADD, 20, {EPOLLIN, {u32=20, u64=14198059139132817428}}) = 0 
  8. epoll_ctl(7, EPOLL_CTL_MOD, 20, {EPOLLIN, {u32=20, u64=20}}) = 0 
  9. epoll_wait(7, [{EPOLLIN, {u32=5, u64=17295150779149058053}}], 8192, 1000) = 1 
  10. epoll_wait(7, [], 8192, 1000)           = 0(不斷輪訓(xùn),1S超時(shí)一次) 

概括為:

  • 將上一步中main線程創(chuàng)建的fd:20綁定端口8023,并開(kāi)啟監(jiān)聽(tīng)(網(wǎng)卡負(fù)責(zé)監(jiān)聽(tīng)和接受連接和數(shù)據(jù),kernel則負(fù)責(zé)路由到具體進(jìn)程,具體參見(jiàn):關(guān)于socket和bind和listen,TODO )。
  • 將7號(hào)socket對(duì)應(yīng)的fd綁定到20號(hào)對(duì)應(yīng)的epoll數(shù)據(jù)結(jié)構(gòu)上去(都是操作kernel中的內(nèi)存)。
  • 開(kāi)始1S中一次阻塞等待epoll有任何連接或數(shù)據(jù)到達(dá)。

3 客戶端連接

boss (23130)

  1. accept(20, {sa_family=AF_INET, sin_port=htons(11144), sin_addr=inet_addr("42.120.74.122")}, [16]) = 24 
  2. getsockname(24, {sa_family=AF_INET, sin_port=htons(8023), sin_addr=inet_addr("192.168.0.120")}, [16]) = 0 
  3. getsockname(24, {sa_family=AF_INET, sin_port=htons(8023), sin_addr=inet_addr("192.168.0.120")}, [16]) = 0 
  4. setsockopt(24, SOL_TCP, TCP_NODELAY, [1], 4) = 0 
  5. getsockopt(24, SOL_SOCKET, SO_SNDBUF, [87040], [4]) = 0 
  6. getsockopt(24, SOL_SOCKET, SO_SNDBUF, [87040], [4]) = 0 
  7. ##拋出 work線程 
  8. clone(child_stack=0x7fc3c4c98fb0, flags=CLONE_VM|CLONE_FS|CLONE_FILES|CLONE_SIGHAND|CLONE_THREAD|CLONE_SYSVSEM|CLONE_SETTLS|CLONE_PARENT_SETTID|CLONE_CHILD_CLEARTID, parent_tidptr=0x7fc3c4c999d0, tls=0x7fc3c4c99700, child_tidptr=0x7fc3c4c999d0) = 2301 

worker (2301)

  1. writev(24, [{"Welcome to iZbp14e1g9ztpshfrla9m"..., 37}, {"It is Sun Aug 23 15:44:14 CST 20"..., 41}], 2) = 78 
  2. epoll_ctl(13, EPOLL_CTL_ADD, 24, {EPOLLIN, {u32=24, u64=24}}) = 0 
  3. epoll_ctl(13, EPOLL_CTL_MOD, 24, {EPOLLIN, {u32=24, u64=14180008216221450264}}) = 0 
  4. epoll_wait(13, [{EPOLLIN, {u32=11, u64=17042151607409573899}}], 8192, 1000) = 1  
  5. read(11, "\1", 128)                     = 1 
  6. ##開(kāi)始無(wú)限loop 
  7. epoll_wait(13, [], 8192, 1000)          = 0 
  8. epoll_wait(13, [{EPOLLIN, {u32=24, u64=24}}], 8192, 1000) = 1 

概括:

  • 當(dāng)BOSS輪訓(xùn)epoll_wait等到了連接后,首先accept得到該socket對(duì)應(yīng)的fd。
  • 連接建立后 BOSS立馬拋出一個(gè)線程(clone函數(shù))。
  • worker(即新建的線程)寫入了一段數(shù)據(jù)(這里是業(yè)務(wù)邏輯)。
  • worker將該client對(duì)應(yīng)的fd綁定到了13號(hào)epoll上。
  • worker繼續(xù)輪訓(xùn)監(jiān)聽(tīng)13號(hào)epoll。

4 客戶端主動(dòng)發(fā)送數(shù)據(jù)

worker(2301)

  1. read(24, "i am daojian\r\n", 1024)      = 14 
  2. write(24, "Did you say 'i am daojian'?\r\n", 29) = 29 
  3. ##繼續(xù)無(wú)限loop 
  4. epoll_wait(13, [], 8192, 1000)          = 0 

概括為:

  • wait到數(shù)據(jù)后,立即read到用戶控件內(nèi)存中(讀取1024個(gè)字節(jié)到 用戶控件某個(gè)buff中)。
  • 寫入數(shù)據(jù)(業(yè)務(wù)邏輯,不必太關(guān)注)。
  • 繼續(xù)輪訓(xùn)等待13號(hào)epoll。

5 客戶端發(fā)送bye報(bào)文,服務(wù)器斷開(kāi)TCP連接

worker(2301)

  1. read(24, "bye\r\n", 1024)               = 5 
  2. write(24, "Have a good day!\r\n", 18)   = 18 
  3. getsockopt(24, SOL_SOCKET, SO_LINGER, {onoff=0, linger=0}, [8]) = 0 
  4. dup2(25, 24)                            = 24 
  5. ##從epoll數(shù)據(jù)結(jié)構(gòu)中(OS)中刪除fd為24的socket 
  6. epoll_ctl(13, EPOLL_CTL_DEL, 24, 0x7f702dd531e0) = -1 ENOENT 
  7. ##關(guān)閉24 socket 
  8. close(24)                               = 0 
  9. ##繼續(xù)等待13 epoll數(shù)據(jù) 
  10. epoll_wait(13, [], 8192, 1000)          = 0 

斷開(kāi)客戶端連接概括為:

  • 從epoll中刪除該客戶端對(duì)應(yīng)的fd(這里觸發(fā)源頭沒(méi)找到,可能是boss)。
  • close關(guān)閉客戶端24號(hào)fd。
  • 繼續(xù)輪訓(xùn)epoll。

6 五個(gè)客戶端同時(shí)連接

boss線程(23130)

  1. accept(20, {sa_family=AF_INET, sin_port=htons(1846), sin_addr=inet_addr("42.120.74.122")}, [16]) = 24 
  2. clone(child_stack=0x7f702cc51fb0, flags=CLONE_VM|CLONE_FS|CLONE_FILES|CLONE_SIGHAND|CLONE_THREAD|CLONE_SYSVSEM|CLONE_SETTLS|CLONE_PARENT_SETTID|CLONE_CHILD_CLEARTID, parent_tidptr=0x7f702cc529d0, tls=0x7f702cc52700, child_tidptr=0x7f702cc529d0) = 10035 
  3.  
  4. accept(20, {sa_family=AF_INET, sin_port=htons(42067), sin_addr=inet_addr("42.120.74.122")}, [16]) = 26 
  5. clone(child_stack=0x7f702cb50fb0, flags=CLONE_VM|CLONE_FS|CLONE_FILES|CLONE_SIGHAND|CLONE_THREAD|CLONE_SYSVSEM|CLONE_SETTLS|CLONE_PARENT_SETTID|CLONE_CHILD_CLEARTID, parent_tidptr=0x7f702cb519d0, tls=0x7f702cb51700, child_tidptr=0x7f702cb519d0) = 10067 
  6.  
  7. ... 

woker線程(10035,第一個(gè)連接)

  1. epoll_ctl(13, EPOLL_CTL_ADD, 24, {EPOLLIN, {u32=24, u64=24}}) = 0 
  2. epoll_ctl(13, EPOLL_CTL_MOD, 24, {EPOLLIN, {u32=24, u64=3226004877247250456}}) = 0 
  3. epoll_wait(13, [{EPOLLIN, {u32=11, u64=17042151607409573899}}], 8192, 1000) = 1                  = 1 
  4. epoll_wait(13, [], 8192, 1000)          = 0 

worker線程(10067,第二個(gè)連接)

  1. epoll_ctl(16, EPOLL_CTL_ADD, 26, {EPOLLIN, {u32=26, u64=26}}) = 0 
  2. epoll_ctl(16, EPOLL_CTL_MOD, 26, {EPOLLIN, {u32=26, u64=3221483685433835546}}) = 0 
  3. epoll_wait(16, [{EPOLLIN, {u32=14, u64=17042497300737294350}}], 8192, 1000) = 1 
  4. epoll_wait(16, [], 8192, 1000)          = 0 
  5. epoll_wait(16, [], 8192, 1000)          = 0 

worker線程(10067,第二個(gè)連接)

  1. epoll_ctl(19, EPOLL_CTL_ADD, 27, {EPOLLIN, {u32=27, u64=27}}) = 0 
  2. epoll_ctl(19, EPOLL_CTL_MOD, 27, {EPOLLIN, {u32=27, u64=3216966479350071323}}) = 0 

worker線程(8055,第四個(gè)連接)

  1. epoll_ctl(10, EPOLL_CTL_ADD, 28, {EPOLLIN, {u32=28, u64=28}}) = 0 
  2. epoll_ctl(10, EPOLL_CTL_MOD, 28, {EPOLLIN, {u32=28, u64=3302604828697427996}}) = 0 

worker線程(10035,第五個(gè)連接,不在clone線程,而是復(fù)用了第一個(gè)epoll對(duì)應(yīng)的worker)

  1. epoll_ctl(13, EPOLL_CTL_ADD, 29, {EPOLLIN, {u32=29, u64=29}}) = 0 
  2. epoll_ctl(13, EPOLL_CTL_MOD, 29, {EPOLLIN, {u32=29, u64=29}}) = 0 

概括為:

  • epoll和boss、worker之間的關(guān)系:一共有4個(gè)worker對(duì)應(yīng)著4個(gè)epoll對(duì)象,boss和每個(gè)worker都有對(duì)應(yīng)自己的epoll。
  • boss根據(jù)epoll數(shù)量,平衡分配連接到每個(gè)worker對(duì)應(yīng)的epoll中。

7 總結(jié)

下圖通過(guò)對(duì)系統(tǒng)調(diào)用的調(diào)查得出 netty 和 kernel 交互圖:

從操作系統(tǒng)層面分析Java IO演進(jìn)之路

初始化直接創(chuàng)建5個(gè)epoll,其中7號(hào)為boss使用,專門用于處理和客戶端連接;其余4個(gè)用來(lái)給worker使用,用戶處理和客戶端的數(shù)據(jù)交互。

work的線程數(shù)量,取決于初始化時(shí)創(chuàng)建了幾個(gè)epoll,worker的復(fù)用本質(zhì)上是epoll的復(fù)用。

work之間為什么要獨(dú)立使用epoll?為什么不共享?

  • 為了避免各個(gè)worker之間發(fā)生爭(zhēng)搶連接處理,netty直接做了物理隔離,避免競(jìng)爭(zhēng)。各個(gè)worker只負(fù)責(zé)處理自己管理的連接,并且后續(xù)該worker中的每個(gè)client的讀寫操作完全由 該線程單獨(dú)處理,天然避免了資源競(jìng)爭(zhēng),避免了鎖。
  • worker單線程,性能考慮:worker不僅僅要epoll_wait,還是處理read、write邏輯,加入worker處理了過(guò)多的連接,勢(shì)必造成這部分消耗時(shí)間片過(guò)多,來(lái)不及處理更多連接,性能下降。

8 優(yōu)缺點(diǎn)

優(yōu)點(diǎn)

  • 數(shù)據(jù)處理:netty提供了大量成熟的數(shù)據(jù)處理組件(ENCODER、DECODER),HTTP、POP3拿來(lái)即用。
  • 編碼復(fù)雜度、可維護(hù)性:netty充分使得業(yè)務(wù)邏輯與網(wǎng)絡(luò)處理解耦,只需要少量的BootStrap配置即可,更多的集中在業(yè)務(wù)邏輯處理上。
  • 性能:netty提供了的ByteBuf(底層Java原生的ByteBuffer),提供了池化的ByteBuf,兼顧讀取性能和ByteBuf內(nèi)存分配(在后續(xù)文檔中會(huì)再做詳解)。

缺點(diǎn)

  • 入門有一定難度。

五 AIO

 

1 啟動(dòng)

main線程

  1. epoll_create(256)                       = 5 
  2. epoll_ctl(5, EPOLL_CTL_ADD, 6, {EPOLLIN, {u32=6, u64=11590018039084482566}}) = 0 
  3.  
  4. ##創(chuàng)建BOSS 線程(Proactor) 
  5. clone(child_stack=0x7f340ac06fb0, flags=CLONE_VM|CLONE_FS|CLONE_FILES|CLONE_SIGHAND|CLONE_THREAD|CLONE_SYSVSEM|CLONE_SETTLS|CLONE_PARENT_SETTID|CLONE_CHILD_CLEARTID, parent_tidptr=0x7f340ac079d0, tls=0x7f340ac07700, child_tidptr=0x7f340ac079d0) = 22704 
  6.  
  7. socket(AF_INET6, SOCK_STREAM, IPPROTO_IP) = 8 
  8. setsockopt(8, SOL_IPV6, IPV6_V6ONLY, [0], 4) = 0 
  9. setsockopt(8, SOL_SOCKET, SO_REUSEADDR, [1], 4) = 0 
  10. bind(8, {sa_family=AF_INET6, sin6_port=htons(9090), inet_pton(AF_INET6, "::", &sin6_addr), sin6_flowinfo=0, sin6_scope_id=0}, 28) = 0 
  11. listen(8, 50) 
  12.  
  13. accept(8, 0x7f67d01b3120, 0x7f67d9246690) = -1 
  14. epoll_ctl(5, EPOLL_CTL_MOD, 8, {EPOLLIN|EPOLLONESHOT, {u32=8, u64=15380749440025362440}}) = -1 ENOENT (No such file or directory) 
  15. epoll_ctl(5, EPOLL_CTL_ADD, 8, {EPOLLIN|EPOLLONESHOT, {u32=8, u64=15380749440025362440}}) = 0 
  16. read(0, 

22704(BOSS 線程(Proactor))

  1. epoll_wait(5,  <unfinished ...> 

2 請(qǐng)求連接

22704(BOSS 線程(Proactor))處理連接

  1. epoll_wait(5,[{EPOLLIN, {u32=9, u64=4294967305}}], 512, -1) = 1 
  2. accept(8, {sa_family=AF_INET6, sin6_port=htons(55320), inet_pton(AF_INET6, "::ffff:36.24.32.140", &sin6_addr), sin6_flowinfo=0, sin6_scope_id=0}, [28]) = 9 
  3. clone(child_stack=0x7ff35c99ffb0, flags=CLONE_VM|CLONE_FS|CLONE_FILES|CLONE_SIGHAND|CLONE_THREAD|CLONE_SYSVSEM|CLONE_SETTLS|CLONE_PARENT_SETTID|CLONE_CHILD_CLEARTID, parent_tidptr=0x7ff35c9a09d0, tls=0x7ff35c9a0700, child_tidptr=0x7ff35c9a09d0) = 26241 
  4. epoll_wait(5,  <unfinished ...> 

26241

  1. #將client 連接的FD加入到BOSS的epoll中,以便BOSS線程監(jiān)聽(tīng)網(wǎng)絡(luò)事件 
  2. epoll_ctl(5, EPOLL_CTL_MOD, 9, {EPOLLIN|EPOLLONESHOT, {u32=9, u64=4398046511113}}) = -1 ENOENT (No such file or directory) 
  3. epoll_ctl(5, EPOLL_CTL_ADD, 9, {EPOLLIN|EPOLLONESHOT, {u32=9, u64=4398046511113}}) = 0 
  4. accept(8, 0x7ff3440008c0, 0x7ff35c99f4d0) = -1 EAGAIN (Resource temporarily unavailable) 
  5. epoll_ctl(5, EPOLL_CTL_MOD, 8, {EPOLLIN|EPOLLONESHOT, {u32=8, u64=8}}) = 0 

3 客戶端發(fā)送數(shù)據(jù)

22704(BOSS 線程(Proactor))處理連接

  1. epoll_wait(5,[{EPOLLIN, {u32=9, u64=4294967305}}], 512, -1) = 1 
  2. ##數(shù)據(jù)讀出 
  3. read(9, "daojian111\r\n", 1024)         = 12 
  4. ##數(shù)據(jù)處理交給其他線程,這里由于線程池為空,需要先clone線程 
  5. clone(child_stack=0x7ff35c99ffb0, flags=CLONE_VM|CLONE_FS|CLONE_FILES|CLONE_SIGHAND|CLONE_THREAD|CLONE_SYSVSEM|CLONE_SETTLS|CLONE_PARENT_SETTID|CLONE_CHILD_CLEARTID, parent_tidptr=0x7ff35c9a09d0, tls=0x7ff35c9a0700, child_tidptr=0x7ff35c9a09d0) = 26532 

復(fù)制線程處理,線程號(hào)26532

  1. write(1, "pool-1-thread-2-10received : dao"..., 41) = 41 
  2. write(1, "\n", 1) 
  3. accept(8, 0x7f11c400b5f0, 0x7f11f42fd4d0) = -1 EAGAIN (Resource temporarily unavailable) 
  4. epoll_ctl(5, EPOLL_CTL_MOD, 8, {EPOLLIN|EPOLLONESHOT, {u32=8, u64=8}}) = 0 

4 總結(jié)

  • 從系統(tǒng)調(diào)用角度,Java的AIO事實(shí)上是以多路復(fù)用(Linux上為epoll)等同步IO為基礎(chǔ),自行實(shí)現(xiàn)了異步事件分發(fā)。
  • BOSS Thread負(fù)責(zé)處理連接,并分發(fā)事件。
  • WORKER Thread只負(fù)責(zé)從BOSS接收的事件執(zhí)行,不負(fù)責(zé)任何網(wǎng)絡(luò)事件監(jiān)聽(tīng)。

從操作系統(tǒng)層面分析Java IO演進(jìn)之路

5 優(yōu)缺點(diǎn)

優(yōu)點(diǎn)

相比于前面的BIO、NIO,AIO已經(jīng)封裝好了任務(wù)調(diào)度,使用時(shí)只需關(guān)心任務(wù)處理。

缺點(diǎn)

  • 事件處理完全由Thread Pool完成,對(duì)于同一個(gè)channel的多個(gè)事件可能會(huì)出現(xiàn)并發(fā)問(wèn)題。
  • 相比netty,buffer API不友好容易出錯(cuò);編解碼工作復(fù)雜。

原文鏈接:https://zhuanlan.51cto.com/art/202106/669120.htm

延伸 · 閱讀

精彩推薦
主站蜘蛛池模板: 欧美特级黄色 | 九九热在线精品视频 | 一区二区三区在线观看免费视频 | 在线观看中文字幕av | 麻豆视频在线观看 | 激情视频免费看 | 中国的免费的视频 | 毛片视频大全 | 国产成人精品区 | 黄色大片免费看 | 毛片免费视频观看 | 亚洲乱搞| 欧美黄色性视频 | 激情网站在线观看 | 斗罗破苍穹在线观看免费完整观看 | 欧美一级高潮 | 久久99精品视频在线观看 | 红杏成人性视频免费看 | 成人av一区二区免费播放 | 亚洲男人一区 | 午夜在线小视频 | 亚洲国产一区二区三区 | 在线视频 欧美日韩 | 久久久av影视 | 日美黄色片 | 色综合久久久久久久久久久 | 护士xxxx| 一级做a爱片毛片免费 | 成人羞羞视频在线观看免费 | 国产精品99久久久久久久vr | 国产99久久久久 | 特级a欧美做爰片毛片 | 久久久久9999 | 一级毛片免费高清 | 欧美成人免费小视频 | a一级黄 | 91午夜在线观看 | 午夜a狂野欧美一区二区 | 日韩黄网站| 97人人草| 视频一区国产精品 |