最新消息:

各种各样的I/O

IO admin 3082浏览 0评论

根据操作的阻塞或非阻塞类型,以及IO的准备就绪、完成事件通知的同步和异步类型,一共有四种不同方式的IO。

  • 同步阻塞IO

在许多web server上,典型的一个连接一个thread的基础,这种类型是IO操作阻塞着应用程序直到完成。

当阻塞式的read方法或write方法被调用时,将有一次上下文切换至kernel中,IO操作会发生,数据会被复制进kernel的 buffer中。然后,kernel buffer会把数据转给用户空间里的应用程序级别的buffer,并且应用程序的thread会被标识为runnable的,此时应用程序会解锁可以从 用户空间的buffer中读取数据。

一个连接一个thread的模型想尝试减少强制一个连接给一个thread的阻塞影响,需要掌握剩下的并发连接不再被IO操作在同一个连接上阻塞。 当连接些都很短且数据延迟都不是很坏的时候这工作得很好。尽管如此,一旦连接变长且数据连接高延迟,可能性就是,线程些被连接长时间抓住不放,因为新连接 的饥饿。如果使用定长的线程池,直到阻塞的线程在阻塞状态中不能被重用以服务新的连接,如果每个新的连接用一个新的线程服务,或者会导致大量的线程会在系 统中被产生,这会演变为漂亮的资源争抢,为了完成高并发的负载,而高上下切换消费。

1
2
3
4
5
ServerSocket server = new ServerSocket(port);
while(true) {
  Socket connection = server.accept();
  spawn-Thread-and-process(connection);
}

简单的一连接一线程server

  • 同步非阻塞IO

这个模型下,设备(网卡)或者连接被设置为非阻塞的,read()和write()操作将不会被阻塞。通常意味着,如果操作不能立即得到结论,将会 返回,带一个error code以指出操作会阻塞(POSIX标准是EWOULDBLOCK)或者是设备临时不可用(POSIX标准是EAGAIN)。由应用程序去检测,直到设 备准备好了并且所有数据被读到。尽量如此,这也不是非常高效,因为每次调用都会激起一次上下文切换给kernel,并且不会考虑数据有没被读到。

  • 带就绪事件的异步非阻塞IO

前面的模型的问题在于,应用程序不得不检测,会忙于等待任务完成。当设备准备好被读写时,有更好的办法通知应用程序吗?这的确就是本模型所提供的好 处。使用特殊的系统调用(因平台而变-linux下使用select()/poll()/epoll(),BSD使用kqueue(),Solaris使 用/dev/poll),应用程序注册感兴趣的点收集IO就绪的信息,从特定的设备(在Linux下使用文件描述符,所有的sockets都被抽象使用了 文件描述符),特定的IO操作(读或写)。然后,这个系统调用被调用,至少其中一个被注册的文件描述符变成ready之前,这调用会被阻塞。一旦这个文件 描述符准备好做IO操作了,就会被取来当作系统调用的返回,然后系统调用就可以在应用程序的loop中被顺序地调用。

准备好的连接处理逻辑经常包括一个用户提供的事件handler,此handler会一起发起非阻塞的read()/write()调用,目的是从 设备取数据给kernel,最终给用户空间的buffer,这会激起上下文切换到kernel。无论如何,通常没有绝对保证,有可能会发生,设备上预期的 由操作系统提供的IO,只是一个指示,设备有可能准备好感兴趣的IO操作了,但read()或write()却不行。尽管如此,与标准情况相比这应该算异 常了。

所以,总结的办法就是,在异步流中获取就绪事件,注册一些事件处理器,当有类似的事件通知被触发的时候抓住他们。正如你所见,所有的事情都可以在一 个单独的线程中完成,即便从多个不同连接过来的多路传输,主要因为select()(这里我选择了典型的系统调用),已经是可以同一时间返回多个 sockets准备就绪的类型。同一时间在多个sockets上返回就绪,这只是一部分好处。这种类型就是经常没提供的非阻塞IO模型。

Java已经抽象出来平台特殊性系统调用的不同,实现了NIO API。Socket 文件描述符被用Channels和Selector抽象,封装到selection系统调用中。应用程序感兴趣的收集就绪事件,注册到Channel(通 常在ServerSocketChannel上accept()就得到一个SocketChannel),注册的内容是Selector,会得到 SelectionKey,这个SelectionKey就是作为一个handle,这个handle的作用是hold住Channel和注册信息。然后 阻塞的select()调用被设置在Selector,它会返回一系列的SelectionKey,然后一个接一个地被程序所指定的事件处理器所处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
elector selector = Selector.open();
channel.configureBlocking(false);
SelectionKey key = channel.register(selector, SelectionKey.OP_READ);
while(true) {
  int readyChannels = selector.select();
  if(readyChannels == 0) continue;
  Set<SelectionKey> selectedKeys = selector.selectedKeys();
  Iterator<SelectionKey> keyIterator = selectedKeys.iterator();
  while(keyIterator.hasNext()) {
    SelectionKey key = keyIterator.next();
    if(key.isAcceptable()) {
        // a connection was accepted by a ServerSocketChannel.
    } else if (key.isConnectable()) {
        // a connection was established with a remote server.
    } else if (key.isReadable()) {
        // a channel is ready for reading
    } else if (key.isWritable()) {
        // a channel is ready for writing
    }
    keyIterator.remove();
  }
}

简单的非阻塞server

  • 带完成事件的异步非阻塞IO

就绪事件只能做到通知你设备socket准备好做事情的程度。应用程序依然不得不做脏活,为了从设备/socket中读数据(更准确地说是通过系 统调用指示操作系统),通过设备的各种思路将数据扔到用户空间的buffer。把任务代理给操作系统在后台运行,一旦完成了让它再通知你,包括从设备到 kernel的buffer再最终到应用程序级别的buffer传送所有的数据,这样岂不是很爽?这就是经常被提到的异步IO模型背后的基础想法。所以需 要操作系统层支持AIO操作。在Linux下从2.6开始在aio POSIX API中被支持,Windows下用I/O Completion Ports支持。

JAVA NIO2在AsynchronousChannel API中一点点支持此模型。

  • 操作系统支持

为了支持就绪和完成事件通知,不同的操作系统提供了各种各样的系统调用。就绪事件 select()和poll()可以在Linux类的系统中使用。尽管如此,更新的epoll()变种更好,因为它比select()和poll()更有 效率。当监控的文件描述符增长时,选择时间在线性增长,这一点上导致了select()不行。在复写文件描述符数组这事上已经臭名昭著。所以每次一被调 用,描述符数组就需要从一个单独的拷贝上重新构建。无论如何这都不是一个优雅的解决方案。

epoll()变体可以按两种办法被配置,边沿触发和层级触发。在边沿触发情况下,只有在相关的描述符上事件被检测到才会发出通知。说了在一个事件 触发通知期间,你的应用程序触发器只会读一半kernel的输入buffer。现在在这个描述符上不会得到通知,甚至到下一个时间周期,除非设备准备好发 更多的数据,否则有一点点数据可读的时候也不会有通知,有足够的数据的时候会导致一次文件描述符的事件。层级触发用另一方式配置,每次数据可读了都会触发 通知。

相比的系统调用还有BSD口味的kqueue,Solaris由于版本不同有/dev/poll或者”Event Completion”。Windows下等价的是“I/O Completion Ports”。

至少在Linux下AIO模型的情况却大不同。Linux中aio的支持看上去埋头在一些意见困扰中,实际地在kernel层面使用就绪事件,同时 在应用程序层面提供异步完成事件的抽象。尽管如此,Windows看上去通过“I/O Completion Ports”支持这个得了第一名。

  • IO模式101

在软件开发中到处是设计模式。I/O不一样。只有两种I/O模式,NIO和AIO,下面进行介绍。

  • Reactor模式

有许多组件使用这个模式来实现。我会解释一遍先,后面好看懂代码。

Reactor启动器:这是会初始化非阻塞服务器的组件,主要是配置和初始化分配器(dispatcher)。首先,它会bind出服务器的 socket,并且通过分接器(demultiplexer)注册,分接器作用是客户端连接接收就绪事件。然后就绪事件(读写接收等)的每种类型的事件处 理器实现会被注册到分配器(dispatcher)。下一次分配器事件loop过程会被调起来,以处理事件通知。

dispatcher:为注册、删除定义接口,分发事件处理器起作用,作用是响应连接事件,包括连接被接受、数据输入输出、一组连接上的超时事件。 为了服务一个客户端连接,相关的事件处理器(比如接受事件处理器)会被注册给被接受的客户端通道(在client socket其下包装),注册内容是分接器(demultiplexer),就绪事件类的都被会注册,以监听此特定的channel。然后,分配器线程会 调出阻塞的就绪选择操作,这些操作在demultiplexer之上,主要为剩下的注册通道。一旦一个或多个被注册的通道准备好IO,分配器会服务给相关 的每个准备好的通道一对一的用注册的事件处理器返回“Handle”。很重要的是,这些事件处理器不会hold住分配器线程,但是会延迟分配器服务其他准 备好的连接。因为常见的在事件处理器里的逻辑,包括传送数据从/去准备好的连接,这些连接会阻塞,一直到所有的数据在用户空间和内核数据缓存中被送完,一 般情况下,这些处理器跑在一个线程池的不同的线程里。

Handle:当一个channel被注册了分接器(demultiplexer)就会返回一个handle,handle概括了连接通道和就绪信息。靠分接器就绪选择操作,一系列的准备好的Handle会被返回。Java NIO里对等的叫SelectionKey。

Demultiplexer:(分接器:54chen专门瞎翻)等待在一个或多个注册的连接通道里的就绪事件。Java NIO里叫Selector。

Event Handler:指接口具有的hook方法,以分配连接事件。这些方法需要被应用程序指定的事件处理器所实现。

Concrete Event Handler:(具体的事件处理器)包括从连接中读写数据的逻辑,并且要做一些必须的过程,或者初始化客户端连接传过的接收协议,这些协议来自通过的Handle。

567d2c5bb154b12a2e15ba1c86805ecc

事件处理器典型地跑在一个线程池的单独的线程中,下面的图片中显示了这一过程。

fa4c14050628b38d9f28090f48dbcf91

一个简单的echo server实现,下面的例子显示了这种模式(没有事件处理器线程池)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
public class ReactorInitiator {
private static final int NIO_SERVER_PORT = 9993;
  public void initiateReactiveServer(int port) throws Exception {
    ServerSocketChannel server = ServerSocketChannel.open();
    server.socket().bind(new InetSocketAddress(port));
    server.configureBlocking(false);
    Dispatcher dispatcher = new Dispatcher();
    dispatcher.registerChannel(SelectionKey.OP_ACCEPT, server);
    dispatcher.registerEventHandler(
      SelectionKey.OP_ACCEPT, new AcceptEventHandler(
      dispatcher.getDemultiplexer()));
    dispatcher.registerEventHandler(
      SelectionKey.OP_READ, new ReadEventHandler(
      dispatcher.getDemultiplexer()));
    dispatcher.registerEventHandler(
    SelectionKey.OP_WRITE, new WriteEventHandler());
    dispatcher.run(); // Run the dispatcher loop
 }
  public static void main(String[] args) throws Exception {
    System.out.println("Starting NIO server at port : " +
      NIO_SERVER_PORT);
    new ReactorInitiator().
      initiateReactiveServer(NIO_SERVER_PORT);
  }
}
public class Dispatcher {
  private Map<Integer, EventHandler> registeredHandlers =
    new ConcurrentHashMap<Integer, EventHandler>();
  private Selector demultiplexer;
  public Dispatcher() throws Exception {
    demultiplexer = Selector.open();
  }
  public Selector getDemultiplexer() {
    return demultiplexer;
  }
  public void registerEventHandler(
    int eventType, EventHandler eventHandler) {
    registeredHandlers.put(eventType, eventHandler);
  }
  // Used to register ServerSocketChannel with the
  // selector to accept incoming client connections
  public void registerChannel(
    int eventType, SelectableChannel channel) throws Exception {
    channel.register(demultiplexer, eventType);
  }
  public void run() {
    try {
      while (true) { // Loop indefinitely
        demultiplexer.select();
        Set<SelectionKey> readyHandles =
          demultiplexer.selectedKeys();
        Iterator<SelectionKey> handleIterator =
          readyHandles.iterator();
        while (handleIterator.hasNext()) {
          SelectionKey handle = handleIterator.next();
          if (handle.isAcceptable()) {
            EventHandler handler =
              registeredHandlers.get(SelectionKey.OP_ACCEPT);
              handler.handleEvent(handle);
           // Note : Here we don't remove this handle from
           // selector since we want to keep listening to
           // new client connections
          }
          if (handle.isReadable()) {
            EventHandler handler =
              registeredHandlers.get(SelectionKey.OP_READ);
            handler.handleEvent(handle);
            handleIterator.remove();
          }
          if (handle.isWritable()) {
            EventHandler handler =
              registeredHandlers.get(SelectionKey.OP_WRITE);
            handler.handleEvent(handle);
            handleIterator.remove();
          }
        }
      }
    } catch (Exception e) {
      e.printStackTrace();
    }
  }
}
public interface EventHandler {
   public void handleEvent(SelectionKey handle) throws Exception;
}
public class AcceptEventHandler implements EventHandler {
  private Selector demultiplexer;
  public AcceptEventHandler(Selector demultiplexer) {
    this.demultiplexer = demultiplexer;
  }
  @Override
  public void handleEvent(SelectionKey handle) throws Exception {
    ServerSocketChannel serverSocketChannel =
     (ServerSocketChannel) handle.channel();
    SocketChannel socketChannel = serverSocketChannel.accept();
    if (socketChannel != null) {
      socketChannel.configureBlocking(false);
      socketChannel.register(
        demultiplexer, SelectionKey.OP_READ);
    }
  }
}
public class ReadEventHandler implements EventHandler {
  private Selector demultiplexer;
  private ByteBuffer inputBuffer = ByteBuffer.allocate(2048);
  public ReadEventHandler(Selector demultiplexer) {
    this.demultiplexer = demultiplexer;
  }
  @Override
  public void handleEvent(SelectionKey handle) throws Exception {
    SocketChannel socketChannel =
     (SocketChannel) handle.channel();
    socketChannel.read(inputBuffer); // Read data from client
    inputBuffer.flip();
    // Rewind the buffer to start reading from the beginning
    byte[] buffer = new byte[inputBuffer.limit()];
    inputBuffer.get(buffer);
    System.out.println("Received message from client : " +
      new String(buffer));
    inputBuffer.flip();
    // Rewind the buffer to start reading from the beginning
    // Register the interest for writable readiness event for
    // this channel in order to echo back the message
    socketChannel.register(
      demultiplexer, SelectionKey.OP_WRITE, inputBuffer);
  }
}
public class WriteEventHandler implements EventHandler {
  @Override
  public void handleEvent(SelectionKey handle) throws Exception {
    SocketChannel socketChannel =
      (SocketChannel) handle.channel();
    ByteBuffer inputBuffer = (ByteBuffer) handle.attachment();
    socketChannel.write(inputBuffer);
    socketChannel.close(); // Close connection
  }
}
  • Proactor模式

此模式基于异步IO模型。主要的组件如下。

Proactive启动器:这是初始化异步操作接收客户端连接的实体。经常是服务器应用程序的主线程。注册一个完成处理器,附着在完成分发器上,以逮到连接接收时的异步事件通知。

Asynchronous Operation Processor:异步操作处理器。其职责是异步地抓出IO操作,提供完成事件通知给应用层的完成处理器。操作系统通常会暴露异步IO接口。

Asynchronous Operation:异步操作的运行在独立的内核线程中,靠异步操作处理器来完成。

Completion Dispatcher:其职责是在异步操作完成时,唤回应用程序的完成处理器。当异步操作处理器完成了一次异步初始化操作,完成分发器会进行应用程序自行维护的回调。通常,委派事件通知处理给相对的事件合适的完成处理器。

Completion Handler:这是被实用程序实现的接口,用于处理异步事件完成events。

92cdbc1e36ed912148f51b199a2e14da

让我们来看看如何用新的Java 7里的NIO.2 API来实现这种模式(一个简单的echo server)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
public class ProactorInitiator {
  static int ASYNC_SERVER_PORT = 4333;
  public void initiateProactiveServer(int port)
    throws IOException {
    final AsynchronousServerSocketChannel listener =
      AsynchronousServerSocketChannel.open().bind(
        new InetSocketAddress(port));
     AcceptCompletionHandler acceptCompletionHandler =
       new AcceptCompletionHandler(listener);
     SessionState state = new SessionState();
     listener.accept(state, acceptCompletionHandler);
  }
  public static void main(String[] args) {
    try {
       System.out.println("Async server listening on port : " +
         ASYNC_SERVER_PORT);
       new ProactorInitiator().initiateProactiveServer(
         ASYNC_SERVER_PORT);
    } catch (IOException e) {
     e.printStackTrace();
    }
    // Sleep indefinitely since otherwise the JVM would terminate
    while (true) {
      try {
        Thread.sleep(Long.MAX_VALUE);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
  }
}
public class AcceptCompletionHandler
  implements
    CompletionHandler<AsynchronousSocketChannel, SessionState> {
  private AsynchronousServerSocketChannel listener;
  public AcceptCompletionHandler(
    AsynchronousServerSocketChannel listener) {
    this.listener = listener;
  }
  @Override
  public void completed(AsynchronousSocketChannel socketChannel,
    SessionState sessionState) {
   // accept the next connection
   SessionState newSessionState = new SessionState();
   listener.accept(newSessionState, this);
   // handle this connection
   ByteBuffer inputBuffer = ByteBuffer.allocate(2048);
   ReadCompletionHandler readCompletionHandler =
     new ReadCompletionHandler(socketChannel, inputBuffer);
   socketChannel.read(
     inputBuffer, sessionState, readCompletionHandler);
  }
  @Override
  public void failed(Throwable exc, SessionState sessionState) {
   // Handle connection failure...
  }
}
public class ReadCompletionHandler implements
  CompletionHandler<Integer, SessionState> {
   private AsynchronousSocketChannel socketChannel;
   private ByteBuffer inputBuffer;
   public ReadCompletionHandler(
     AsynchronousSocketChannel socketChannel,
     ByteBuffer inputBuffer) {
     this.socketChannel = socketChannel;
     this.inputBuffer = inputBuffer;
   }
   @Override
   public void completed(
     Integer bytesRead, SessionState sessionState) {
     byte[] buffer = new byte[bytesRead];
     inputBuffer.rewind();
     // Rewind the input buffer to read from the beginning
     inputBuffer.get(buffer);
     String message = new String(buffer);
     System.out.println("Received message from client : " +
       message);
     // Echo the message back to client
     WriteCompletionHandler writeCompletionHandler =
       new WriteCompletionHandler(socketChannel);
     ByteBuffer outputBuffer = ByteBuffer.wrap(buffer);
     socketChannel.write(
       outputBuffer, sessionState, writeCompletionHandler);
  }
  @Override
  public void failed(Throwable exc, SessionState attachment) {
    //Handle read failure.....
   }
}
public class WriteCompletionHandler implements
  CompletionHandler<Integer, SessionState> {
  private AsynchronousSocketChannel socketChannel;
  public WriteCompletionHandler(
    AsynchronousSocketChannel socketChannel) {
    this.socketChannel = socketChannel;
  }
  @Override
  public void completed(
    Integer bytesWritten, SessionState attachment) {
    try {
      socketChannel.close();
    } catch (IOException e) {
      e.printStackTrace();
    }
  }
  @Override
  public void failed(Throwable exc, SessionState attachment) {
   // Handle write failure.....
  }
}
public class SessionState {
  private Map<String, String> sessionProps =
    new ConcurrentHashMap<String, String>();
   public String getProperty(String key) {
     return sessionProps.get(key);
   }
   public void setProperty(String key, String value) {
     sessionProps.put(key, value);
   }
}

每种类型的事件完成(接受、读、写)都会被一个单独的完成处理器handle,这个处理器实现了CompletionHandler接口(Accept/ Read/ WriteCompletionHandler等)。状态过渡被管理在这些连接处理器中。额外SessionState参数可以被用于hold客户端的session,待定的状态就可以跨这一系列的完成事件了。

  • NIO框架(HTTP核心)

如果你在考虑实现一个NIO的HTTP服务器,你有福了。Apache HTTPCore包对使用NIO处理HTTP流量提供了优秀的支持。API在内置的用NIO对付http请求处理层之上提供了高层次的抽象。下面给出一个 最小化的非阻塞Http服务器实现,任何的GET访问都会返回一个样本输出。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
public class NHttpServer {
  public void start() throws IOReactorException {
    HttpParams params = new BasicHttpParams();
    // Connection parameters
    params.
      setIntParameter(
        HttpConnectionParams.SO_TIMEOUT, 60000)
     .setIntParameter(
       HttpConnectionParams.SOCKET_BUFFER_SIZE, 8 * 1024)
     .setBooleanParameter(
       HttpConnectionParams.STALE_CONNECTION_CHECK, true)
     .setBooleanParameter(
       HttpConnectionParams.TCP_NODELAY, true);
    final DefaultListeningIOReactor ioReactor =
      new DefaultListeningIOReactor(2, params);
    // Spawns an IOReactor having two reactor threads
    // running selectors. Number of threads here is
    // usually matched to the number of processor cores
    // in the system
    // Application specific readiness event handler
    ServerHandler handler = new ServerHandler();
    final IOEventDispatch ioEventDispatch =
      new DefaultServerIOEventDispatch(handler, params);
    // Default IO event dispatcher encapsulating the
    // event handler
    ListenerEndpoint endpoint = ioReactor.listen(
      new InetSocketAddress(4444));
    // start the IO reactor in a new separate thread
    Thread t = new Thread(new Runnable() {
      public void run() {
        try {
          System.out.println("Listening in port 4444");
          ioReactor.execute(ioEventDispatch);
        } catch (InterruptedIOException ex) {
          ex.printStackTrace();
        } catch (IOException e) {
          e.printStackTrace();
        } catch (Exception e) {
          e.printStackTrace();
        }
    }
    });
    t.start();
    // Wait for the endpoint to become ready,
    // i.e. for the listener to start accepting requests.
    try {
      endpoint.waitFor();
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }
  public static void main(String[] args)
    throws IOReactorException {
    new NHttpServer().start();
  }
}
public class ServerHandler implements NHttpServiceHandler {
 private static final int BUFFER_SIZE = 2048;
 private static final String RESPONSE_SOURCE_BUFFER =
 "response-source-buffer";
 // the factory to create HTTP responses
 private final HttpResponseFactory responseFactory;
 // the HTTP response processor
 private final HttpProcessor httpProcessor;
 // the strategy to re-use connections
 private final ConnectionReuseStrategy connStrategy;
 // the buffer allocator
 private final ByteBufferAllocator allocator;
 public ServerHandler() {
   super();
   this.responseFactory = new DefaultHttpResponseFactory();
   this.httpProcessor = new BasicHttpProcessor();
   this.connStrategy = new DefaultConnectionReuseStrategy();
   this.allocator = new HeapByteBufferAllocator();
 }
 @Override
 public void connected(
   NHttpServerConnection nHttpServerConnection) {
   System.out.println("New incoming connection");
 }
 @Override
 public void requestReceived(
   NHttpServerConnection nHttpServerConnection) {
   HttpRequest request =
     nHttpServerConnection.getHttpRequest();
   if (request instanceof HttpEntityEnclosingRequest) {
     // Handle POST and PUT requests
   } else {
     ContentOutputBuffer outputBuffer =
       new SharedOutputBuffer(
         BUFFER_SIZE, nHttpServerConnection, allocator);
     HttpContext context =
       nHttpServerConnection.getContext();
     context.setAttribute(
       RESPONSE_SOURCE_BUFFER, outputBuffer);
     OutputStream os =
       new ContentOutputStream(outputBuffer);
     // create the default response to this request
     ProtocolVersion httpVersion =
     request.getRequestLine().getProtocolVersion();
     HttpResponse response =
       responseFactory.newHttpResponse(
         httpVersion, HttpStatus.SC_OK,
         nHttpServerConnection.getContext());
     // create a basic HttpEntity using the source
     // channel of the response pipe
     BasicHttpEntity entity = new BasicHttpEntity();
     if (httpVersion.greaterEquals(HttpVersion.HTTP_1_1)) {
       entity.setChunked(true);
     }
     response.setEntity(entity);
     String method = request.getRequestLine().
       getMethod().toUpperCase();
     if (method.equals("GET")) {
       try {
         nHttpServerConnection.suspendInput();
         nHttpServerConnection.submitResponse(response);
         os.write(new String("Hello client..").
           getBytes("UTF-8"));
         os.flush();
         os.close();
     } catch (Exception e) {
       e.printStackTrace();
     }
    } // Handle other http methods
   }
 }
 @Override
 public void inputReady(
    NHttpServerConnection nHttpServerConnection,
    ContentDecoder contentDecoder) {
    // Handle request enclosed entities here by reading
    // them from the channel
 }
 @Override
 public void responseReady(
    NHttpServerConnection nHttpServerConnection) {
   try {
     nHttpServerConnection.close();
   } catch (IOException e) {
     e.printStackTrace();
   }
 }
 @Override
 public void outputReady(
   NHttpServerConnection nHttpServerConnection,
   ContentEncoder encoder) {
   HttpContext context = nHttpServerConnection.getContext();
   ContentOutputBuffer outBuf =
    (ContentOutputBuffer) context.getAttribute(
      RESPONSE_SOURCE_BUFFER);
   try {
     outBuf.produceContent(encoder);
   } catch (IOException e) {
     e.printStackTrace();
   }
 }
 @Override
 public void exception(
   NHttpServerConnection nHttpServerConnection,
   IOException e) {
   e.printStackTrace();
 }
 @Override
 public void exception(
   NHttpServerConnection nHttpServerConnection,
   HttpException e) {
   e.printStackTrace();
 }
 @Override
 public void timeout(
   NHttpServerConnection nHttpServerConnection) {
   try {
     nHttpServerConnection.close();
   } catch (IOException e) {
     e.printStackTrace();
   }
 }
 @Override
 public void closed(
   NHttpServerConnection nHttpServerConnection) {
   try {
     nHttpServerConnection.close();
   } catch (IOException e) {
     e.printStackTrace();
   }
 }
}

IOReactor类将基础地包装了分配器定义,靠的是ServerHandler的实现来处理就绪事件。

Apache Synapse(一个开源的ESB)包括了一个好的实现,此实现是个NIO基础的HTTP服务器,在其中NIO被用于扩展每个实例接巨量客户端,但又不会 使内存随时间上涨。实现也包括了不错的debug和服务器统计收集算法,还集成了Axis2传输框架。可以在[1]中找到。

  • 结论

IO上有许多的选项可以做,可影响到服务器的扩展性和性能。上面的每种IO都有利有弊,做决定时要考虑扩展性和性能特征,以及利于管理。得到结论,长篇一张关于IO。尽管提建议、改正和你想的评论。所有提及的clients代码都可以从这里下载。

  • 参考文献

过程中有许多文献一眼就过了,下面是有趣的一些。

[1] http://www.ibm.com/developerworks/java/library/j-nio2-1/index.html

[2] http://www.ibm.com/developerworks/linux/library/l-async/

[3] http://lse.sourceforge.net/io/aionotes.txt

[4] http://wknight8111.blogspot.com/search/label/AIO

[5] http://nick-black.com/dankwiki/index.php/Fast_UNIX_Servers

[6] http://today.java.net/pub/a/today/2007/02/13/architecture-of-highly-scalable-nio-server.html

[7] Java NIO by Ron Hitchens

[8] http://www.dre.vanderbilt.edu/~schmidt/PDF/reactor-siemens.pdf

[9] http://www.cs.wustl.edu/~schmidt/PDF/proactor.pdf

[10] http://www.kegel.com/c10k.html

[*] 读过就想翻是病,得治。翻得匆忙,有错误的地方麻烦指出修正。

转载请注明:爱开源 » 各种各样的I/O

您必须 登录 才能发表评论!