本文来自DoubleH的BlogJava博客,原文标题为《基于JDK7 NIO2的高性能web服务器实践之二》。该主题的第一篇博文可在这里阅读。
推荐专题:Java 7 下一代Java开发技术详解
前一篇博客,我简单提了下怎么为NIO2增加TransmitFile支持,文件传送吞吐量是一个性能关注点,此外,并发连接数也是重要的关注点。
不过JDK7中又一次做了简单的实现,不支持同时投递多个AcceptEx请求,只支持一次一个,返回后再投递。这样,客户端连接的接受速度必然大打折扣。不知道为什么sun会做这样的实现,WSASend()/WSAReceive()一次只允许一个还是可以理解,毕竟简化了编程,不用考虑封包乱序问题。
也降低了内存耗尽的风险。AcceptEx却没有这样的理由了。
于是再一次为了性能,我增加了同时投递多个的支持。
另外,在JDK7的默认实现中,AcceptEx返回后,为了设置远程和本地InetSocketAddress也采用了效率很低的方法。4次通过JNI调用getsockname,2次为了取sockaddr,2次为了取port. 这些操作本人采用GetAcceptExSockaddrs一次完成,进一步提高效率。
先看Java部分的代码,框架跟JDK7的一样,细节处理不一样:
- /**
- *
- */
- package sun.nio.ch;
- import java.io.IOException;
- import java.lang.reflect.Field;
- import java.lang.reflect.Method;
- import java.net.InetAddress;
- import java.net.InetSocketAddress;
- import java.nio.channels.AcceptPendingException;
- import java.nio.channels.AsynchronousCloseException;
- import java.nio.channels.AsynchronousServerSocketChannel;
- import java.nio.channels.AsynchronousSocketChannel;
- import java.nio.channels.ClosedChannelException;
- import java.nio.channels.CompletionHandler;
- import java.nio.channels.NotYetBoundException;
- import java.nio.channels.ShutdownChannelGroupException;
- import java.security.AccessControlContext;
- import java.security.AccessController;
- import java.security.PrivilegedAction;
- import java.util.Queue;
- import java.util.concurrent.ConcurrentLinkedQueue;
- import java.util.concurrent.Future;
- import java.util.concurrent.atomic.AtomicBoolean;
- import java.util.concurrent.atomic.AtomicInteger;
- import sun.misc.Unsafe;
- /**
- * This class enable multiple 'AcceptEx' post on the completion port, hence improve the concurrent connection number.
- * @author Yvon
- *
- */
- public class WindowsMultiAcceptSupport {
- WindowsAsynchronousServerSocketChannelImpl schannel;
- private static final Unsafe unsafe = Unsafe.getUnsafe();
- // 2 * (sizeof(SOCKET_ADDRESS) + 16)
- private static final int ONE_DATA_BUFFER_SIZE = 88;
- private long handle;
- private Iocp iocp;
- // typically there will be zero, or one I/O operations pending. In rare
- // cases there may be more. These rare cases arise when a sequence of accept
- // operations complete immediately and handled by the initiating thread.
- // The corresponding OVERLAPPED cannot be reused/released until the completion
- // event has been posted.
- private PendingIoCache ioCache;
- private Queue
dataBuffers; - // the data buffer to receive the local/remote socket address
- // private final long dataBuffer;
- private AtomicInteger pendingAccept;
- private int maxPending;
- Method updateAcceptContextM;
- Method acceptM;
- WindowsMultiAcceptSupport() {
- //dummy for JNI code.
- }
- public void close() throws IOException {
- schannel.close();
- for (int i = 0; i < maxPending + 1; i++)//assert there is maxPending+1 buffer in the queue
- {
- long addr = dataBuffers.poll();
- // release resources
- unsafe.freeMemory(addr);
- }
- }
- /**
- *
- */
- public WindowsMultiAcceptSupport(AsynchronousServerSocketChannel ch, int maxPost) {
- if (maxPost <= 0 || maxPost > 1024)
- throw new IllegalStateException("maxPost can't less than 1 and greater than 1024");
- this.schannel = (WindowsAsynchronousServerSocketChannelImpl) ch;
- maxPending = maxPost;
- dataBuffers = new ConcurrentLinkedQueue
(); - for (int i = 0; i < maxPending + 1; i++) {
- dataBuffers.add(unsafe.allocateMemory(ONE_DATA_BUFFER_SIZE));
- }
- pendingAccept = new AtomicInteger(0);
- try {
- Field f = WindowsAsynchronousServerSocketChannelImpl.class.getDeclaredField("handle");
- f.setAccessible(true);
- handle = f.getLong(schannel);
- f = WindowsAsynchronousServerSocketChannelImpl.class.getDeclaredField("iocp");
- f.setAccessible(true);
- iocp = (Iocp) f.get(schannel);
- f = WindowsAsynchronousServerSocketChannelImpl.class.getDeclaredField("ioCache");
- f.setAccessible(true);
- ioCache = (PendingIoCache) f.get(schannel);
- f = WindowsAsynchronousServerSocketChannelImpl.class.getDeclaredField("accepting");
- f.setAccessible(true);
- AtomicBoolean accepting = (AtomicBoolean) f.get(schannel);
- accepting.set(true);//disable accepting by origin channel.
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- @SuppressWarnings("unchecked")
- public final void accept(A attachment,
- CompletionHandler
handler) { - if (handler == null)
- throw new NullPointerException("'handler' is null");
- implAccept(attachment, (CompletionHandler
) handler); - }
- /**
- * Task to initiate accept operation and to handle result.
- */
- private class AcceptTask implements Runnable, Iocp.ResultHandler {
- private final WindowsAsynchronousSocketChannelImpl channel;
- private final AccessControlContext acc;
- private final PendingFuture
result; - private final long dataBuffer;
- AcceptTask(WindowsAsynchronousSocketChannelImpl channel, AccessControlContext acc,
- long dataBuffer, PendingFuture
result) { - this.channel = channel;
- this.acc = acc;
- this.result = result;
- this.dataBuffer = dataBuffer;
- }
- void enableAccept() {
- pendingAccept.decrementAndGet();
- dataBuffers.add(dataBuffer);
- }
- void closeChildChannel() {
- try {
- channel.close();
- } catch (IOException ignore) {
- }
- }
- // caller must have acquired read lock for the listener and child channel.
- void finishAccept() throws IOException {
- /**
- * JDK7 use 4 calls to getsockname to setup
- * local& remote address, this is very inefficient.
- *
- * I change this to use GetAcceptExSockaddrs
- */
- InetAddress[] socks = new InetAddress[2];
- int[] ports = new int[2];
- updateAcceptContext(handle, channel.handle(), socks, ports, dataBuffer);
- InetSocketAddress local = new InetSocketAddress(socks[0], ports[0]);
- final InetSocketAddress remote = new InetSocketAddress(socks[1], ports[1]);
- channel.setConnected(local, remote);
- // permission check (in context of initiating thread)
- if (acc != null) {
- AccessController.doPrivileged(new PrivilegedAction
() { - public Void run() {
- SecurityManager sm = System.getSecurityManager();
- sm.checkAccept(remote.getAddress().getHostAddress(), remote.getPort());
- return null;
- }
- }, acc);
- }
- }
- /**
- * Initiates the accept operation.
- */
- @Override
- public void run() {
- long overlapped = 0L;
- try {
- // begin usage of listener socket
- schannel.begin();
- try {
- // begin usage of child socket (as it is registered with
- // completion port and so may be closed in the event that
- // the group is forcefully closed).
- channel.begin();
- synchronized (result) {
- overlapped = ioCache.add(result);
- int n = accept0(handle, channel.handle(), overlapped, dataBuffer);//Be careful for the buffer address
- if (n == IOStatus.UNAVAILABLE) {
- return;
- }
- // connection accepted immediately
- finishAccept();
- // allow another accept before the result is set
- enableAccept();
- result.setResult(channel);
- }
- } finally {
- // end usage on child socket
- channel.end();
- }
- } catch (Throwable x) {
- // failed to initiate accept so release resources
- if (overlapped != 0L)
- ioCache.remove(overlapped);
- closeChildChannel();
- if (x instanceof ClosedChannelException)
- x = new AsynchronousCloseException();
- if (!(x instanceof IOException) && !(x instanceof SecurityException))
- x = new IOException(x);
- enableAccept();
- result.setFailure(x);
- } finally {
- // end of usage of listener socket
- schannel.end();
- }
- // accept completed immediately but may not have executed on
- // initiating thread in which case the operation may have been
- // cancelled.
- if (result.isCancelled()) {
- closeChildChannel();
- }
- // invoke completion handler
- Invoker.invokeIndirectly(result);
- }
- /**
- * Executed when the I/O has completed
- */
- @Override
- public void completed(int bytesTransferred, boolean canInvokeDirect) {
- try {
- // connection accept after group has shutdown
- if (iocp.isShutdown()) {
- throw new IOException(new ShutdownChannelGroupException());
- }
- // finish the accept
- try {
- schannel.begin();
- try {
- channel.begin();
- finishAccept();
- } finally {
- channel.end();
- }
- } finally {
- schannel.end();
- }
- // allow another accept before the result is set
- enableAccept();
- result.setResult(channel);
- } catch (Throwable x) {
- enableAccept();
- closeChildChannel();
- if (x instanceof ClosedChannelException)
- x = new AsynchronousCloseException();
- if (!(x instanceof IOException) && !(x instanceof SecurityException))
- x = new IOException(x);
- result.setFailure(x);
- }
- // if an async cancel has already cancelled the operation then
- // close the new channel so as to free resources
- if (result.isCancelled()) {
- closeChildChannel();
- }
- // invoke handler (but not directly)
- Invoker.invokeIndirectly(result);
- }
- @Override
- public void failed(int error, IOException x) {
- enableAccept();
- closeChildChannel();
- // release waiters
- if (schannel.isOpen()) {
- result.setFailure(x);
- } else {
- result.setFailure(new AsynchronousCloseException());
- }
- Invoker.invokeIndirectly(result);
- }
- }
- Future
implAccept(Object attachment, - final CompletionHandler
handler) { - if (!schannel.isOpen()) {
- Throwable exc = new ClosedChannelException();
- if (handler == null)
- return CompletedFuture.withFailure(exc);
- Invoker.invokeIndirectly(schannel, handler, attachment, null, exc);
- return null;
- }
- if (schannel.isAcceptKilled())
- throw new RuntimeException("Accept not allowed due to cancellation");
- // ensure channel is bound to local address
- if (schannel.localAddress == null)
- throw new NotYetBoundException();
- // create the socket that will be accepted. The creation of the socket
- // is enclosed by a begin/end for the listener socket to ensure that
- // we check that the listener is open and also to prevent the I/O
- // port from being closed as the new socket is registered.
- WindowsAsynchronousSocketChannelImpl ch = null;
- IOException ioe = null;
- try {
- schannel.begin();
- ch = new WindowsAsynchronousSocketChannelImpl(iocp, false);
- } catch (IOException x) {
- ioe = x;
- } finally {
- schannel.end();
- }
- if (ioe != null) {
- if (handler == null)
- return CompletedFuture.withFailure(ioe);
- Invoker.invokeIndirectly(this.schannel, handler, attachment, null, ioe);
- return null;
- }
- // need calling context when there is security manager as
- // permission check may be done in a different thread without
- // any application call frames on the stack
- AccessControlContext acc =
- (System.getSecurityManager() == null) ? null : AccessController.getContext();
- PendingFuture
result = - new PendingFuture
(schannel, handler, attachment); - // check and set flag to prevent concurrent accepting
- if (pendingAccept.get() >= maxPending)
- throw new AcceptPendingException();
- pendingAccept.incrementAndGet();
- AcceptTask task = new AcceptTask(ch, acc, dataBuffers.poll(), result);
- result.setContext(task);
- // initiate I/O
- if (Iocp.supportsThreadAgnosticIo()) {
- task.run();
- } else {
- Invoker.invokeOnThreadInThreadPool(this.schannel, task);
- }
- return result;
- }
- // //reimplements for performance
- static native void updateAcceptContext(long listenSocket, long acceptSocket,
- InetAddress[] addresses, int[] ports, long dataBuffer) throws IOException;
- static native int accept0(long handle, long handle2, long overlapped, long dataBuffer);
- }
对应的CPP代码如下:
- /*
- * Class: sun_nio_ch_WindowsMultiAcceptSupport
- * Method: updateAcceptContext
- * Signature: (JJ[Ljava/net/InetAddress;[IJ)V
- */
- JNIEXPORT void JNICALL Java_sun_nio_ch_WindowsMultiAcceptSupport_updateAcceptContext
- (JNIEnv *env , jclass clazz, jlong listenSocket, jlong acceptSocket, jobjectArray sockArray,jintArray portArray,jlong buf)
- {
- SOCKET s1 = (SOCKET)jlong_to_ptr(listenSocket);
- SOCKET s2 = (SOCKET)jlong_to_ptr(acceptSocket);
- PVOID outputBuffer = (PVOID)jlong_to_ptr(buf);
- INT iLocalAddrLen=0;
- INT iRemoteAddrLen=0;
- SOCKETADDRESS* lpLocalAddr;
- SOCKETADDRESS* lpRemoteAddr;
- jobject localAddr;
- jobject remoteAddr;
- jint ports[2]={0};
- setsockopt(s2, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, (char *)&s1, sizeof(s1));
- (lpGetAcceptExSockaddrs)(outputBuffer,
- 0,
- sizeof(SOCKETADDRESS)+16,
- sizeof(SOCKETADDRESS)+16,
- (LPSOCKADDR*)&lpLocalAddr,
- &iLocalAddrLen,
- (LPSOCKADDR*)&lpRemoteAddr,
- &iRemoteAddrLen);
- localAddr=lpNET_SockaddrToInetAddress(env,(struct sockaddr *)lpLocalAddr,(int *)ports);
- remoteAddr=lpNET_SockaddrToInetAddress(env,(struct sockaddr *)lpRemoteAddr,(int *)(ports+1));
- env->SetObjectArrayElement(sockArray,0,localAddr);
- env->SetObjectArrayElement(sockArray,1,remoteAddr);
- env->SetIntArrayRegion(portArray,0,2,ports);
- }
- /*
- * Class: sun_nio_ch_WindowsMultiAcceptSupport
- * Method: accept0
- * Signature: (JJJJ)I
- */
- jint JNICALL Java_sun_nio_ch_WindowsMultiAcceptSupport_accept0
- (JNIEnv *env, jclass clazz, jlong listenSocket, jlong acceptSocket, jlong ov, jlong buf)
- {
- BOOL res;
- SOCKET s1 = (SOCKET)jlong_to_ptr(listenSocket);
- SOCKET s2 = (SOCKET)jlong_to_ptr(acceptSocket);
- PVOID outputBuffer = (PVOID)jlong_to_ptr(buf);
- DWORD nread = 0;
- OVERLAPPED* lpOverlapped = (OVERLAPPED*)jlong_to_ptr(ov);
- ZeroMemory((PVOID)lpOverlapped, sizeof(OVERLAPPED));
- //why use SOCKETADDRESS?
- //because client may use IPv6 to connect to server.
- res = (lpAcceptEx)(s1,
- s2,
- outputBuffer,
- 0,
- sizeof(SOCKETADDRESS)+16,
- sizeof(SOCKETADDRESS)+16,
- &nread,
-  
文章标题:Java7NIO2高性能Web服务器并发连接的处理
网址分享:http://www.shufengxianlan.com/qtweb/news1/354201.html网站建设、网络推广公司-创新互联,是专注品牌与效果的网站制作,网络营销seo公司;服务项目有等
声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 创新互联