Nio探险

Published: by Creative Commons Licence

  • Tags:

源码

一般NIO的代码如下:

public class NIOTest {
    public static void main(String[] args) throws IOException {
        Selector selector = Selector.open();
        ServerSocketChannel serverChannel = ServerSocketChannel.open();
        serverChannel.configureBlocking(false);
        serverChannel.bind(new InetSocketAddress(8888), 1000);
        serverChannel.register(selector, SelectionKey.OP_ACCEPT);

        ByteBuffer buf = ByteBuffer.allocate(4096);
        while (true) {
            int num = selector.select();
            if (num == 0) {
                continue;
            }
            for (SelectionKey key : selector.selectedKeys()) {
                SelectableChannel channel = key.channel();
                if (channel == serverChannel && key.isAcceptable()) {
                    SocketChannel sc = serverChannel.accept();
                    if (sc != null) {
                        sc.configureBlocking(false);
                        sc.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
                        System.out.println("Accept:" + sc);
                    }
                }
                if (channel instanceof SocketChannel && key.isReadable()) {
                    buf.clear();
                    ((SocketChannel) channel).read(buf);
                    String s = new String(buf.array());
                    System.out.println("Read:" + s);
                }
            }
            selector.selectedKeys().clear();
        }
    }
}

Nio中的核心元素一般就是Selector、Buffer、Channel。

我们先看一下Selector。

public abstract class Selector implements Closeable {
    public static Selector open() throws IOException {
        return SelectorProvider.provider().openSelector();
    }
}

要开启选择器,会需要调用SelectorProvider的静态工厂方法得到provider来创建一个它的实例。

public abstract class SelectorProvider {
    public static SelectorProvider provider() {
        synchronized (lock) {
            //复用上次创建的provider实例,考虑到外面有加互斥锁,因此provider实例只会被创建一个。
            if (provider != null)
                return provider;
            return AccessController.doPrivileged(
                new PrivilegedAction<SelectorProvider>() {
                    public SelectorProvider run() {
                            if (loadProviderFromProperty())
                                return provider;
                            if (loadProviderAsService())
                                return provider;
                            provider = sun.nio.ch.DefaultSelectorProvider.create();
                            return provider;
                        }
                    });
        }
    }
}

加载Provider有三种形式,一种就是从属性中加载,这是一种SPI机制。你需要设置系统属性java.nio.channels.spi.SelectorProvider才可以,否则会返回null。

    private static boolean loadProviderFromProperty() {
        String cn = System.getProperty("java.nio.channels.spi.SelectorProvider");
        if (cn == null)
            return false;
        try {
            Class<?> c = Class.forName(cn, true,
                                       ClassLoader.getSystemClassLoader());
            provider = (SelectorProvider)c.newInstance();
            return true;
        } catch (ClassNotFoundException x) {
            throw new ServiceConfigurationError(null, x);
        } catch (IllegalAccessException x) {
            throw new ServiceConfigurationError(null, x);
        } catch (InstantiationException x) {
            throw new ServiceConfigurationError(null, x);
        } catch (SecurityException x) {
            throw new ServiceConfigurationError(null, x);
        }
    }

第二顺位就是从本地的META/INF/java.nio.channels.spi.SelectorProvider的文件中读取类全限定名称,之后逐一尝试加载类实例(如果是非安全原因,才允许重复尝试,否则抛出异常)。这是java默认支持的SPI机制。

    private static boolean loadProviderAsService() {

        ServiceLoader<SelectorProvider> sl =
            ServiceLoader.load(SelectorProvider.class,
                               ClassLoader.getSystemClassLoader());
        Iterator<SelectorProvider> i = sl.iterator();
        for (;;) {
            try {
                if (!i.hasNext())
                    return false;
                provider = i.next();
                return true;
            } catch (ServiceConfigurationError sce) {
                if (sce.getCause() instanceof SecurityException) {
                    // Ignore the security exception, try the next provider
                    continue;
                }
                throw sce;
            }
        }
    }

如果上面两种方案都调用失败,就会调用默认的sun.nio.ch.DefaultSelectorProvider类来创建具体的SelectorProvider。而这个类在不同操作系统下是不同的,我们来看一下在linux环境下的实现类。

/**
 * Creates this platform's default SelectorProvider
 */

public class DefaultSelectorProvider {

    /**
     * Prevent instantiation.
     */
    private DefaultSelectorProvider() { }

    @SuppressWarnings("unchecked")
    //通过反射创建对应的实例
    private static SelectorProvider createProvider(String cn) {
        Class<SelectorProvider> c;
        try {
            c = (Class<SelectorProvider>)Class.forName(cn);
        } catch (ClassNotFoundException x) {
            throw new AssertionError(x);
        }
        try {
            return c.newInstance();
        } catch (IllegalAccessException | InstantiationException x) {
            throw new AssertionError(x);
        }

    }

    /**
     * Returns the default SelectorProvider.
     */
    public static SelectorProvider create() {
        String osname = AccessController
            .doPrivileged(new GetPropertyAction("os.name"));
        if (osname.equals("SunOS"))
            return createProvider("sun.nio.ch.DevPollSelectorProvider");
        if (osname.equals("Linux"))
            return createProvider("sun.nio.ch.EPollSelectorProvider");
        return new sun.nio.ch.PollSelectorProvider();
    }

}

可以看到,在linux系统会,会查找sun.nio.ch.EPollSelectorProvider类型,并创建它的实例。它的实现非常简单:

public class EPollSelectorProvider
    extends SelectorProviderImpl
{
    public AbstractSelector openSelector() throws IOException {
        return new EPollSelectorImpl(this);
    }

    public Channel inheritedChannel() throws IOException {
        return InheritedChannel.getChannel();
    }
}

拿到SelectorProvider的下一步就是调用它的openSelector,我们来看一下它具体的实现。它创建了一个EPollSelectorImpl实例,很显然这个实例用的是linux下的epoll多路复用机制做的。这个类只有包访问权限。

class EPollSelectorImpl
    extends SelectorImpl
{
    EPollSelectorImpl(SelectorProvider sp) {
        super(sp);
        long pipeFds = IOUtil.makePipe(false);
        fd0 = (int) (pipeFds >>> 32);
        fd1 = (int) pipeFds;
        //创建一个EPoll数组的包装类
        pollWrapper = new EPollArrayWrapper();
        pollWrapper.initInterrupt(fd0, fd1);
        //fdToKey存储从句柄到Key的索引
        fdToKey = new HashMap<Integer,SelectionKeyImpl>();
    }
}

之后了解一下Channel。我们知道SelectableChannel可以注册到我们的Selector上。具体代码如下:

public abstract class AbstractSelectableChannel
    extends SelectableChannel
{
    public final SelectionKey register(Selector sel, int ops,
                                       Object att)
        throws ClosedChannelException
    {
        synchronized (regLock) {
            if (!isOpen())
                throw new ClosedChannelException();
            if ((ops & ~validOps()) != 0)
                throw new IllegalArgumentException();
            if (blocking)
                throw new IllegalBlockingModeException();
            SelectionKey k = findKey(sel);
            if (k != null) {
                //设置感兴趣的事件
                k.interestOps(ops);
                //绑定attachment
                k.attach(att);
            }
            if (k == null) {
                // New registration
                synchronized (keyLock) {
                    if (!isOpen())
                        throw new ClosedChannelException();
                    k = ((AbstractSelector)sel).register(this, ops, att);
                    addKey(k);
                }
            }
            return k;
        }
    }
}

具体的逻辑还是会回到Selector#register方法中。

public abstract class SelectorImpl extends AbstractSelector {
    protected final SelectionKey register(AbstractSelectableChannel var1, int var2, Object var3) {
        //Channel必须继承SelChImpl
        if (!(var1 instanceof SelChImpl)) {
            throw new IllegalSelectorException();
        } else {

            SelectionKeyImpl var4 = new SelectionKeyImpl((SelChImpl)var1, this);
            var4.attach(var3);
            synchronized(this.publicKeys) {
                this.implRegister(var4);
            }

            var4.interestOps(var2);
            return var4;
        }
    }
}

而实现注册的代码在implRegister中,一窥庐山真面目。

class EPollSelectorImpl
    extends SelectorImpl
{
    protected void implRegister(SelectionKeyImpl ski) {
        if (closed)
            throw new ClosedSelectorException();
        SelChImpl ch = ski.channel;
        fdToKey.put(Integer.valueOf(ch.getFDVal()), ski);
        pollWrapper.add(ch);
        keys.add(ski);
    }
}

其逻辑是转发给了pollWrapper

class EPollArrayWrapper {
    void add(SelChImpl channel) {
        synchronized (updateList) {
            updateList.add(new Updator(channel, EPOLL_CTL_ADD));
        }
    }
}

它只是暂时把管道信息加入到某个updateList中。

再看一下selector中的select的逻辑。它默认会阻塞直到有至少一个套接字可用,当然你也可以使用selectNow,它是非阻塞的。

public abstract class SelectorImpl extends AbstractSelector {
    public int select() throws IOException {
        return this.select(0L);
    }
    public int select(long var1) throws IOException {
        if (var1 < 0L) {
            throw new IllegalArgumentException("Negative timeout");
        } else {
            //传0,则不会超时
            return this.lockAndDoSelect(var1 == 0L ? -1L : var1);
        }
    }
    private int lockAndDoSelect(long var1) throws IOException {
        synchronized(this) {
            if (!this.isOpen()) {
                throw new ClosedSelectorException();
            } else {
                int var10000;
                synchronized(this.publicKeys) {
                    synchronized(this.publicSelectedKeys) {
                        var10000 = this.doSelect(var1);
                    }
                }

                return var10000;
            }
        }
    }
}

看一下doSelect的具体逻辑。

class EPollSelectorImpl
    extends SelectorImpl
{
    protected int doSelect(long timeout)
        throws IOException
    {
        if (closed)
            throw new ClosedSelectorException();
        processDeregisterQueue();
        try {
            begin();
            pollWrapper.poll(timeout);
        } finally {
            end();
        }
        processDeregisterQueue();
        //这边会负责将就绪的SelectedKey放到selectedKeys集合中
        int numKeysUpdated = updateSelectedKeys();
        if (pollWrapper.interrupted()) {
            // Clear the wakeup pipe
            pollWrapper.putEventOps(pollWrapper.interruptedIndex(), 0);
            synchronized (interruptLock) {
                pollWrapper.clearInterrupted();
                IOUtil.drain(fd0);
                interruptTriggered = false;
            }
        }
        return numKeysUpdated;
    }
}

EPollArrayWrapper中,它的底层先会把在updateList中的注册对象,通过epollCtl进行注册。

之后比较核心的就是调用epollWait去获得就绪的文件描述符。

class EPollArrayWrapper {
    void updateRegistrations() {
        synchronized (updateList) {
            Updator u = null;
            while ((u = updateList.poll()) != null) {
                SelChImpl ch = u.channel;
                if (!ch.isOpen())
                    continue;

                // if the events are 0 then file descriptor is put into "idle
                // set" to prevent it being polled
                if (u.events == 0) {
                    boolean added = idleSet.add(u.channel);
                    // if added to idle set then remove from epoll if registered
                    if (added && (u.opcode == EPOLL_CTL_MOD))
                        epollCtl(epfd, EPOLL_CTL_DEL, ch.getFDVal(), 0);
                } else {
                    // events are specified. If file descriptor was in idle set
                    // it must be re-registered (by converting opcode to ADD)
                    boolean idle = false;
                    if (!idleSet.isEmpty())
                        idle = idleSet.remove(u.channel);
                    int opcode = (idle) ? EPOLL_CTL_ADD : u.opcode;
                    epollCtl(epfd, opcode, ch.getFDVal(), u.events);
                }
            }
        }
    }
    int poll(long timeout) throws IOException {
        updateRegistrations();
        updated = epollWait(pollArrayAddress, NUM_EPOLLEVENTS, timeout, epfd);
        for (int i=0; i<updated; i++) {
            if (getDescriptor(i) == incomingInterruptFD) {
                interruptedIndex = i;
                interrupted = true;
                break;
            }
        }
        return updated;
    }
}

其中epollCtlepollWait都是调用的本地方法,其JVM源码如下:

JNIEXPORT void JNICALL
Java_sun_nio_ch_EPollArrayWrapper_epollCtl(JNIEnv *env, jobject this, jint epfd,
                                           jint opcode, jint fd, jint events)
{
    struct epoll_event event;
    int res;

    event.events = events;
    event.data.fd = fd;

    RESTARTABLE(epoll_ctl(epfd, (int)opcode, (int)fd, &event), res);

    /*
     * A channel may be registered with several Selectors. When each Selector
     * is polled a EPOLL_CTL_DEL op will be inserted into its pending update
     * list to remove the file descriptor from epoll. The "last" Selector will
     * close the file descriptor which automatically unregisters it from each
     * epoll descriptor. To avoid costly synchronization between Selectors we
     * allow pending updates to be processed, ignoring errors. The errors are
     * harmless as the last update for the file descriptor is guaranteed to
     * be EPOLL_CTL_DEL.
     */
    if (res < 0 && errno != EBADF && errno != ENOENT && errno != EPERM) {
        JNU_ThrowIOExceptionWithLastError(env, "epoll_ctl failed");
    }
}

JNIEXPORT jint JNICALL
Java_sun_nio_ch_EPollArrayWrapper_epollWait(JNIEnv *env, jobject this,
                                            jlong address, jint numfds,
                                            jlong timeout, jint epfd)
{
    struct epoll_event *events = jlong_to_ptr(address);
    int res;

    if (timeout <= 0) {           /* Indefinite or no wait */
        RESTARTABLE(epoll_wait(epfd, events, numfds, timeout), res);
    } else {                      /* Bounded wait; bounded restarts */
        res = iepoll(epfd, events, numfds, timeout);
    }

    if (res < 0) {
        JNU_ThrowIOExceptionWithLastError(env, "epoll_wait failed");
    }
    return res;
}

最后再看一下,怎么获得就绪的连接,即selectedKeys

public abstract class SelectorImpl extends AbstractSelector {
    public Set<SelectionKey> selectedKeys() {
        if (!this.isOpen() && !Util.atBugLevel("1.4")) {
            throw new ClosedSelectorException();
        } else {
            //直接返回
            return this.publicSelectedKeys;
        }
    }
}

可以发现所有就绪的连接都存在publicSelectedKeys这个对象中,这个对象实际上是成员selectedKeys的一个代理,不允许执行插入操作,其余操作允许,它在构造器中创建。

public abstract class SelectorImpl extends AbstractSelector {
    protected SelectorImpl(SelectorProvider var1) {
        super(var1);
        if (Util.atBugLevel("1.4")) {
            this.publicKeys = this.keys;
            this.publicSelectedKeys = this.selectedKeys;
        } else {
            this.publicKeys = Collections.unmodifiableSet(this.keys);
            this.publicSelectedKeys = Util.ungrowableSet(this.selectedKeys);
        }

    }
}

selectedKeysdoSelect的时候更新。看一下具体的放入代码:

class EPollSelectorImpl
    extends SelectorImpl
{
    protected int doSelect(long timeout)
        throws IOException
    {
        if (closed)
            throw new ClosedSelectorException();
        processDeregisterQueue();
        try {
            begin();
            pollWrapper.poll(timeout);
        } finally {
            end();
        }
        processDeregisterQueue();
        int numKeysUpdated = updateSelectedKeys();
        if (pollWrapper.interrupted()) {
            // Clear the wakeup pipe
            pollWrapper.putEventOps(pollWrapper.interruptedIndex(), 0);
            synchronized (interruptLock) {
                pollWrapper.clearInterrupted();
                IOUtil.drain(fd0);
                interruptTriggered = false;
            }
        }
        return numKeysUpdated;
    }

    private int updateSelectedKeys() {
        int entries = pollWrapper.updated;
        int numKeysUpdated = 0;
        for (int i=0; i<entries; i++) {
            int nextFD = pollWrapper.getDescriptor(i);
            SelectionKeyImpl ski = fdToKey.get(Integer.valueOf(nextFD));
            // ski is null in the case of an interrupt
            if (ski != null) {
                int rOps = pollWrapper.getEventOps(i);
                //重复命中的话,则后续只是设置标记
                if (selectedKeys.contains(ski)) {
                    if (ski.channel.translateAndSetReadyOps(rOps, ski)) {
                        numKeysUpdated++;
                    }
                } else {
                    ski.channel.translateAndSetReadyOps(rOps, ski);
                    //如果它感兴趣的至少一个事件命中,则加入到集合中
                    if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {
                        selectedKeys.add(ski);
                        numKeysUpdated++;
                    }
                }
            }
        }
        return numKeysUpdated;
    }
}