Nio探险
源码
一般NIO的代码如下:
public class NIOTest {
public static void main(String[] args) throws IOException {
Selector selector = Selector.open();
ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
serverChannel.bind(new InetSocketAddress(8888), 1000);
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
ByteBuffer buf = ByteBuffer.allocate(4096);
while (true) {
int num = selector.select();
if (num == 0) {
continue;
}
for (SelectionKey key : selector.selectedKeys()) {
SelectableChannel channel = key.channel();
if (channel == serverChannel && key.isAcceptable()) {
SocketChannel sc = serverChannel.accept();
if (sc != null) {
sc.configureBlocking(false);
sc.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
System.out.println("Accept:" + sc);
}
}
if (channel instanceof SocketChannel && key.isReadable()) {
buf.clear();
((SocketChannel) channel).read(buf);
String s = new String(buf.array());
System.out.println("Read:" + s);
}
}
selector.selectedKeys().clear();
}
}
}
Nio中的核心元素一般就是Selector、Buffer、Channel。
我们先看一下Selector。
public abstract class Selector implements Closeable {
public static Selector open() throws IOException {
return SelectorProvider.provider().openSelector();
}
}
要开启选择器,会需要调用SelectorProvider
的静态工厂方法得到provider
来创建一个它的实例。
public abstract class SelectorProvider {
public static SelectorProvider provider() {
synchronized (lock) {
//复用上次创建的provider实例,考虑到外面有加互斥锁,因此provider实例只会被创建一个。
if (provider != null)
return provider;
return AccessController.doPrivileged(
new PrivilegedAction<SelectorProvider>() {
public SelectorProvider run() {
if (loadProviderFromProperty())
return provider;
if (loadProviderAsService())
return provider;
provider = sun.nio.ch.DefaultSelectorProvider.create();
return provider;
}
});
}
}
}
加载Provider
有三种形式,一种就是从属性中加载,这是一种SPI机制。你需要设置系统属性java.nio.channels.spi.SelectorProvider
才可以,否则会返回null。
private static boolean loadProviderFromProperty() {
String cn = System.getProperty("java.nio.channels.spi.SelectorProvider");
if (cn == null)
return false;
try {
Class<?> c = Class.forName(cn, true,
ClassLoader.getSystemClassLoader());
provider = (SelectorProvider)c.newInstance();
return true;
} catch (ClassNotFoundException x) {
throw new ServiceConfigurationError(null, x);
} catch (IllegalAccessException x) {
throw new ServiceConfigurationError(null, x);
} catch (InstantiationException x) {
throw new ServiceConfigurationError(null, x);
} catch (SecurityException x) {
throw new ServiceConfigurationError(null, x);
}
}
第二顺位就是从本地的META/INF/java.nio.channels.spi.SelectorProvider
的文件中读取类全限定名称,之后逐一尝试加载类实例(如果是非安全原因,才允许重复尝试,否则抛出异常)。这是java默认支持的SPI机制。
private static boolean loadProviderAsService() {
ServiceLoader<SelectorProvider> sl =
ServiceLoader.load(SelectorProvider.class,
ClassLoader.getSystemClassLoader());
Iterator<SelectorProvider> i = sl.iterator();
for (;;) {
try {
if (!i.hasNext())
return false;
provider = i.next();
return true;
} catch (ServiceConfigurationError sce) {
if (sce.getCause() instanceof SecurityException) {
// Ignore the security exception, try the next provider
continue;
}
throw sce;
}
}
}
如果上面两种方案都调用失败,就会调用默认的sun.nio.ch.DefaultSelectorProvider
类来创建具体的SelectorProvider
。而这个类在不同操作系统下是不同的,我们来看一下在linux
环境下的实现类。
/**
* Creates this platform's default SelectorProvider
*/
public class DefaultSelectorProvider {
/**
* Prevent instantiation.
*/
private DefaultSelectorProvider() { }
@SuppressWarnings("unchecked")
//通过反射创建对应的实例
private static SelectorProvider createProvider(String cn) {
Class<SelectorProvider> c;
try {
c = (Class<SelectorProvider>)Class.forName(cn);
} catch (ClassNotFoundException x) {
throw new AssertionError(x);
}
try {
return c.newInstance();
} catch (IllegalAccessException | InstantiationException x) {
throw new AssertionError(x);
}
}
/**
* Returns the default SelectorProvider.
*/
public static SelectorProvider create() {
String osname = AccessController
.doPrivileged(new GetPropertyAction("os.name"));
if (osname.equals("SunOS"))
return createProvider("sun.nio.ch.DevPollSelectorProvider");
if (osname.equals("Linux"))
return createProvider("sun.nio.ch.EPollSelectorProvider");
return new sun.nio.ch.PollSelectorProvider();
}
}
可以看到,在linux
系统会,会查找sun.nio.ch.EPollSelectorProvider
类型,并创建它的实例。它的实现非常简单:
public class EPollSelectorProvider
extends SelectorProviderImpl
{
public AbstractSelector openSelector() throws IOException {
return new EPollSelectorImpl(this);
}
public Channel inheritedChannel() throws IOException {
return InheritedChannel.getChannel();
}
}
拿到SelectorProvider
的下一步就是调用它的openSelector
,我们来看一下它具体的实现。它创建了一个EPollSelectorImpl
实例,很显然这个实例用的是linux下的epoll
多路复用机制做的。这个类只有包访问权限。
class EPollSelectorImpl
extends SelectorImpl
{
EPollSelectorImpl(SelectorProvider sp) {
super(sp);
long pipeFds = IOUtil.makePipe(false);
fd0 = (int) (pipeFds >>> 32);
fd1 = (int) pipeFds;
//创建一个EPoll数组的包装类
pollWrapper = new EPollArrayWrapper();
pollWrapper.initInterrupt(fd0, fd1);
//fdToKey存储从句柄到Key的索引
fdToKey = new HashMap<Integer,SelectionKeyImpl>();
}
}
之后了解一下Channel。我们知道SelectableChannel
可以注册到我们的Selector
上。具体代码如下:
public abstract class AbstractSelectableChannel
extends SelectableChannel
{
public final SelectionKey register(Selector sel, int ops,
Object att)
throws ClosedChannelException
{
synchronized (regLock) {
if (!isOpen())
throw new ClosedChannelException();
if ((ops & ~validOps()) != 0)
throw new IllegalArgumentException();
if (blocking)
throw new IllegalBlockingModeException();
SelectionKey k = findKey(sel);
if (k != null) {
//设置感兴趣的事件
k.interestOps(ops);
//绑定attachment
k.attach(att);
}
if (k == null) {
// New registration
synchronized (keyLock) {
if (!isOpen())
throw new ClosedChannelException();
k = ((AbstractSelector)sel).register(this, ops, att);
addKey(k);
}
}
return k;
}
}
}
具体的逻辑还是会回到Selector#register
方法中。
public abstract class SelectorImpl extends AbstractSelector {
protected final SelectionKey register(AbstractSelectableChannel var1, int var2, Object var3) {
//Channel必须继承SelChImpl
if (!(var1 instanceof SelChImpl)) {
throw new IllegalSelectorException();
} else {
SelectionKeyImpl var4 = new SelectionKeyImpl((SelChImpl)var1, this);
var4.attach(var3);
synchronized(this.publicKeys) {
this.implRegister(var4);
}
var4.interestOps(var2);
return var4;
}
}
}
而实现注册的代码在implRegister
中,一窥庐山真面目。
class EPollSelectorImpl
extends SelectorImpl
{
protected void implRegister(SelectionKeyImpl ski) {
if (closed)
throw new ClosedSelectorException();
SelChImpl ch = ski.channel;
fdToKey.put(Integer.valueOf(ch.getFDVal()), ski);
pollWrapper.add(ch);
keys.add(ski);
}
}
其逻辑是转发给了pollWrapper
。
class EPollArrayWrapper {
void add(SelChImpl channel) {
synchronized (updateList) {
updateList.add(new Updator(channel, EPOLL_CTL_ADD));
}
}
}
它只是暂时把管道信息加入到某个updateList中。
再看一下selector中的select
的逻辑。它默认会阻塞直到有至少一个套接字可用,当然你也可以使用selectNow
,它是非阻塞的。
public abstract class SelectorImpl extends AbstractSelector {
public int select() throws IOException {
return this.select(0L);
}
public int select(long var1) throws IOException {
if (var1 < 0L) {
throw new IllegalArgumentException("Negative timeout");
} else {
//传0,则不会超时
return this.lockAndDoSelect(var1 == 0L ? -1L : var1);
}
}
private int lockAndDoSelect(long var1) throws IOException {
synchronized(this) {
if (!this.isOpen()) {
throw new ClosedSelectorException();
} else {
int var10000;
synchronized(this.publicKeys) {
synchronized(this.publicSelectedKeys) {
var10000 = this.doSelect(var1);
}
}
return var10000;
}
}
}
}
看一下doSelect
的具体逻辑。
class EPollSelectorImpl
extends SelectorImpl
{
protected int doSelect(long timeout)
throws IOException
{
if (closed)
throw new ClosedSelectorException();
processDeregisterQueue();
try {
begin();
pollWrapper.poll(timeout);
} finally {
end();
}
processDeregisterQueue();
//这边会负责将就绪的SelectedKey放到selectedKeys集合中
int numKeysUpdated = updateSelectedKeys();
if (pollWrapper.interrupted()) {
// Clear the wakeup pipe
pollWrapper.putEventOps(pollWrapper.interruptedIndex(), 0);
synchronized (interruptLock) {
pollWrapper.clearInterrupted();
IOUtil.drain(fd0);
interruptTriggered = false;
}
}
return numKeysUpdated;
}
}
在EPollArrayWrapper
中,它的底层先会把在updateList
中的注册对象,通过epollCtl
进行注册。
之后比较核心的就是调用epollWait
去获得就绪的文件描述符。
class EPollArrayWrapper {
void updateRegistrations() {
synchronized (updateList) {
Updator u = null;
while ((u = updateList.poll()) != null) {
SelChImpl ch = u.channel;
if (!ch.isOpen())
continue;
// if the events are 0 then file descriptor is put into "idle
// set" to prevent it being polled
if (u.events == 0) {
boolean added = idleSet.add(u.channel);
// if added to idle set then remove from epoll if registered
if (added && (u.opcode == EPOLL_CTL_MOD))
epollCtl(epfd, EPOLL_CTL_DEL, ch.getFDVal(), 0);
} else {
// events are specified. If file descriptor was in idle set
// it must be re-registered (by converting opcode to ADD)
boolean idle = false;
if (!idleSet.isEmpty())
idle = idleSet.remove(u.channel);
int opcode = (idle) ? EPOLL_CTL_ADD : u.opcode;
epollCtl(epfd, opcode, ch.getFDVal(), u.events);
}
}
}
}
int poll(long timeout) throws IOException {
updateRegistrations();
updated = epollWait(pollArrayAddress, NUM_EPOLLEVENTS, timeout, epfd);
for (int i=0; i<updated; i++) {
if (getDescriptor(i) == incomingInterruptFD) {
interruptedIndex = i;
interrupted = true;
break;
}
}
return updated;
}
}
其中epollCtl
和epollWait
都是调用的本地方法,其JVM源码如下:
JNIEXPORT void JNICALL
Java_sun_nio_ch_EPollArrayWrapper_epollCtl(JNIEnv *env, jobject this, jint epfd,
jint opcode, jint fd, jint events)
{
struct epoll_event event;
int res;
event.events = events;
event.data.fd = fd;
RESTARTABLE(epoll_ctl(epfd, (int)opcode, (int)fd, &event), res);
/*
* A channel may be registered with several Selectors. When each Selector
* is polled a EPOLL_CTL_DEL op will be inserted into its pending update
* list to remove the file descriptor from epoll. The "last" Selector will
* close the file descriptor which automatically unregisters it from each
* epoll descriptor. To avoid costly synchronization between Selectors we
* allow pending updates to be processed, ignoring errors. The errors are
* harmless as the last update for the file descriptor is guaranteed to
* be EPOLL_CTL_DEL.
*/
if (res < 0 && errno != EBADF && errno != ENOENT && errno != EPERM) {
JNU_ThrowIOExceptionWithLastError(env, "epoll_ctl failed");
}
}
JNIEXPORT jint JNICALL
Java_sun_nio_ch_EPollArrayWrapper_epollWait(JNIEnv *env, jobject this,
jlong address, jint numfds,
jlong timeout, jint epfd)
{
struct epoll_event *events = jlong_to_ptr(address);
int res;
if (timeout <= 0) { /* Indefinite or no wait */
RESTARTABLE(epoll_wait(epfd, events, numfds, timeout), res);
} else { /* Bounded wait; bounded restarts */
res = iepoll(epfd, events, numfds, timeout);
}
if (res < 0) {
JNU_ThrowIOExceptionWithLastError(env, "epoll_wait failed");
}
return res;
}
最后再看一下,怎么获得就绪的连接,即selectedKeys
。
public abstract class SelectorImpl extends AbstractSelector {
public Set<SelectionKey> selectedKeys() {
if (!this.isOpen() && !Util.atBugLevel("1.4")) {
throw new ClosedSelectorException();
} else {
//直接返回
return this.publicSelectedKeys;
}
}
}
可以发现所有就绪的连接都存在publicSelectedKeys
这个对象中,这个对象实际上是成员selectedKeys
的一个代理,不允许执行插入操作,其余操作允许,它在构造器中创建。
public abstract class SelectorImpl extends AbstractSelector {
protected SelectorImpl(SelectorProvider var1) {
super(var1);
if (Util.atBugLevel("1.4")) {
this.publicKeys = this.keys;
this.publicSelectedKeys = this.selectedKeys;
} else {
this.publicKeys = Collections.unmodifiableSet(this.keys);
this.publicSelectedKeys = Util.ungrowableSet(this.selectedKeys);
}
}
}
而selectedKeys
在doSelect
的时候更新。看一下具体的放入代码:
class EPollSelectorImpl
extends SelectorImpl
{
protected int doSelect(long timeout)
throws IOException
{
if (closed)
throw new ClosedSelectorException();
processDeregisterQueue();
try {
begin();
pollWrapper.poll(timeout);
} finally {
end();
}
processDeregisterQueue();
int numKeysUpdated = updateSelectedKeys();
if (pollWrapper.interrupted()) {
// Clear the wakeup pipe
pollWrapper.putEventOps(pollWrapper.interruptedIndex(), 0);
synchronized (interruptLock) {
pollWrapper.clearInterrupted();
IOUtil.drain(fd0);
interruptTriggered = false;
}
}
return numKeysUpdated;
}
private int updateSelectedKeys() {
int entries = pollWrapper.updated;
int numKeysUpdated = 0;
for (int i=0; i<entries; i++) {
int nextFD = pollWrapper.getDescriptor(i);
SelectionKeyImpl ski = fdToKey.get(Integer.valueOf(nextFD));
// ski is null in the case of an interrupt
if (ski != null) {
int rOps = pollWrapper.getEventOps(i);
//重复命中的话,则后续只是设置标记
if (selectedKeys.contains(ski)) {
if (ski.channel.translateAndSetReadyOps(rOps, ski)) {
numKeysUpdated++;
}
} else {
ski.channel.translateAndSetReadyOps(rOps, ski);
//如果它感兴趣的至少一个事件命中,则加入到集合中
if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {
selectedKeys.add(ski);
numKeysUpdated++;
}
}
}
}
return numKeysUpdated;
}
}