1、源碼:
// NettyServer.java, Boss 線程管理組, 上面NettyServer.java中的示例代碼
bossGroup = new NioEventLoopGroup(1);
// NioEventLoopGroup.java
/**
* Create a new instance using the specified number of threads, {@link ThreadFactory} and the
* {@link SelectorProvider} which is returned by {@link SelectorProvider#provider()}.
*/
public NioEventLoopGroup(int nThreads) {
this(nThreads, (Executor) null);
}
// NioEventLoopGroup.java
public NioEventLoopGroup(int nThreads, Executor executor) {
this(nThreads, executor, SelectorProvider.provider());
}
// NioEventLoopGroup.java
public NioEventLoopGroup(
int nThreads, Executor executor, final SelectorProvider selectorProvider) {
this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}
// NioEventLoopGroup.java
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
final SelectStrategyFactory selectStrategyFactory) {
super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}
// MultithreadEventLoopGroup.java
/**
* @see MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, Executor, Object...)
*/
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
// DEFAULT_EVENT_LOOP_THREADS 默認(rèn)為CPU核數(shù)的2倍
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
// MultithreadEventExecutorGroup.java
/**
* Create a new instance.
*
* @param nThreads the number of threads that will be used by this instance.
* @param executor the Executor to use, or {@code null} if the default should be used.
* @param args arguments which will passed to each {@link #newChild(Executor, Object...)} call
*/
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}
// MultithreadEventExecutorGroup.java
/**
* Create a new instance.
*
* @param nThreads the number of threads that will be used by this instance.
* @param executor the Executor to use, or {@code null} if the default should be used.
* @param chooserFactory the {@link EventExecutorChooserFactory} to use.
* @param args arguments which will passed to each {@link #newChild(Executor, Object...)} call
*/
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
if (nThreads <= 0) { // 小于或等于零都會(huì)直接拋異常,由此可見,要想使用netty,還得必須至少得有1個(gè)線程跑起來才能使用
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}
if (executor == null) { // 如果調(diào)用方不想自己定制線程池的話,那么則用netty自己默認(rèn)的線程池
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
children = new EventExecutor[nThreads]; // 構(gòu)建孩子結(jié)點(diǎn)數(shù)組,也就是構(gòu)建NioEventLoopGroup持有的線程數(shù)組
for (int i = 0; i < nThreads; i ++) { // 循環(huán)線程數(shù),依次創(chuàng)建實(shí)例化線程封裝的對(duì)象NioEventLoop
boolean success = false;
try {
children[i] = newChild(executor, args); // 最終調(diào)用到了NioEventLoopGroup類中的newChild方法
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
if (!success) {
for (int j = 0; j < i; j ++) {
children[j].shutdownGracefully();
}
for (int j = 0; j < i; j ++) {
EventExecutor e = children[j];
try {
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
// Let the caller handle the interruption.
Thread.currentThread().interrupt();
break;
}
}
}
}
}
// 實(shí)例化選擇線程器,也就是說我們要想執(zhí)行任務(wù),對(duì)于nThreads個(gè)線程,我們得靠一個(gè)規(guī)則來如何選取哪個(gè)具體線程來執(zhí)行任務(wù);
// 那么chooser就是來干這個(gè)事情的,它主要是幫我們選出需要執(zhí)行任務(wù)的線程封裝對(duì)象NioEventLoop
chooser = chooserFactory.newChooser(children);
final FutureListener
4.2、實(shí)例化線程管理組的孩子結(jié)點(diǎn)children[i]
1、源碼:
// MultithreadEventExecutorGroup.java, 最終調(diào)用到了NioEventLoopGroup類中的newChild方法
children[i] = newChild(executor, args);
// NioEventLoopGroup.java
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}
// NioEventLoop.java
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
// 調(diào)用父類的構(gòu)造方法
// DEFAULT_MAX_PENDING_TASKS 任務(wù)隊(duì)列初始化容量值,默認(rèn)值為:Integer.MAX_VALUE
// 若不想使用默認(rèn)值的話,那么就得自己配置 io.netty.eventLoop.maxPendingTasks 屬性值為自己想要的值
super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
if (strategy == null) {
throw new NullPointerException("selectStrategy");
}
// 這個(gè)對(duì)象在NioEventLoopGroup的構(gòu)造函數(shù)中通過SelectorProvider.provider()獲得,然后一路傳參到此類
provider = selectorProvider;
// 通過調(diào)用JDK底層類庫,為每個(gè)NioEventLoop配備一個(gè)多路復(fù)用器
final SelectorTuple selectorTuple = openSelector();
selector = selectorTuple.selector;
unwrappedSelector = selectorTuple.unwrappedSelector;
selectStrategy = strategy;
}
// SingleThreadEventLoop.java
protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
boolean addTaskWakesUp, int maxPendingTasks,
RejectedExecutionHandler rejectedExecutionHandler) {
// 調(diào)用父類的構(gòu)造方法
super(parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);
// 構(gòu)造任務(wù)隊(duì)列,最終會(huì)調(diào)用NioEventLoop的newTaskQueue(int maxPendingTasks)方法
tailTasks = newTaskQueue(maxPendingTasks);
}
// SingleThreadEventExecutor.java
/**
* Create a new instance
*
* @param parent the {@link EventExecutorGroup} which is the parent of this instance and belongs to it
* @param executor the {@link Executor} which will be used for executing
* @param addTaskWakesUp {@code true} if and only if invocation of {@link #addTask(Runnable)} will wake up the
* executor thread
* @param maxPendingTasks the maximum number of pending tasks before new tasks will be rejected.
* @param rejectedHandler the {@link RejectedExecutionHandler} to use.
*/
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
boolean addTaskWakesUp, int maxPendingTasks,
RejectedExecutionHandler rejectedHandler) {
// 調(diào)用父類的構(gòu)造方法
super(parent);
this.addTaskWakesUp = addTaskWakesUp; // 添加任務(wù)時(shí)是否需要喚醒多路復(fù)用器的阻塞狀態(tài)
this.maxPendingTasks = Math.max(16, maxPendingTasks);
this.executor = ObjectUtil.checkNotNull(executor, "executor");
taskQueue = newTaskQueue(this.maxPendingTasks);
rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}
// AbstractScheduledEventExecutor.java
protected AbstractScheduledEventExecutor(EventExecutorGroup parent) {
// 調(diào)用父類的構(gòu)造方法
super(parent);
}
// AbstractEventExecutor.java
protected AbstractEventExecutor(EventExecutorGroup parent) {
this.parent = parent;
}
2、該流程主要實(shí)例化線程管理組的孩子結(jié)點(diǎn)children[i],孩子結(jié)點(diǎn)的類型為NioEventLoop類型;
3、仔細(xì)一看,netty的開發(fā)者對(duì)命名也很講究,線程管理組的類名為NioEventLoopGroup,線程管理組的子線程類名為NioEventLoop,
有沒有發(fā)現(xiàn)有什么不一樣的地方?其實(shí)就是差了個(gè)Group幾個(gè)字母,線程管理組自然以Group結(jié)尾,不是組的就自然沒有Group字母;
4、每個(gè)NioEventLoop都持有組的線程池executor對(duì)象,方便添加task到任務(wù)隊(duì)列中;
5、每個(gè)NioEventLoop都有一個(gè)selector多路復(fù)用器,而那些Channel就是注冊(cè)到這個(gè)玩意上面的;
6、每個(gè)NioEventLoop都有一個(gè)任務(wù)隊(duì)列,而且這個(gè)隊(duì)列的初始化容器大小為1024;
4.3、如何構(gòu)建任務(wù)隊(duì)列
1、源碼:
// SingleThreadEventLoop.java, 構(gòu)造任務(wù)隊(duì)列,最終會(huì)調(diào)用NioEventLoop的newTaskQueue(int maxPendingTasks)方法
tailTasks = newTaskQueue(maxPendingTasks);
// NioEventLoop.java
@Override
protected Queue newTaskQueue(int maxPendingTasks) {
// This event loop never calls takeTask()
// 由于默認(rèn)是沒有配置io.netty.eventLoop.maxPendingTasks屬性值的,所以maxPendingTasks默認(rèn)值為Integer.MAX_VALUE;
// 那么最后配備的任務(wù)隊(duì)列的大小也就自然使用無參構(gòu)造隊(duì)列方法
return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.newMpscQueue()
: PlatformDependent.newMpscQueue(maxPendingTasks);
}
// PlatformDependent.java
/**
* Create a new {@link Queue} which is safe to use for multiple producers (different threads) and a single
* consumer (one thread!).
* @return A MPSC queue which may be unbounded.
*/
public static Queue newMpscQueue() {
return Mpsc.newMpscQueue();
}
// Mpsc.java
static Queue newMpscQueue() {
// 默認(rèn)值 MPSC_CHUNK_SIZE = 1024;
return USE_MPSC_CHUNKED_ARRAY_QUEUE ? new MpscUnboundedArrayQueue(MPSC_CHUNK_SIZE)
: new MpscUnboundedAtomicArrayQueue(MPSC_CHUNK_SIZE);
}
2、這里主要看看NioEventLoop是如何構(gòu)建任務(wù)隊(duì)列的,而且還構(gòu)建了一個(gè)給定初始化容量值大小的隊(duì)列;
4.4、如何獲得多路復(fù)用器
1、源碼:
// NioEventLoop.java, 通過調(diào)用JDK底層類庫,為每個(gè)NioEventLoop配備一個(gè)多路復(fù)用器
final SelectorTuple selectorTuple = openSelector();
selector = selectorTuple.selector;
unwrappedSelector = selectorTuple.unwrappedSelector;
selectStrategy = strategy;
// NioEventLoop.java
private SelectorTuple openSelector() {
final Selector unwrappedSelector;
try {
// 通過 provider 調(diào)用底層獲取一個(gè)多路復(fù)用器對(duì)象
unwrappedSelector = provider.openSelector();
} catch (IOException e) {
throw new ChannelException("failed to open a new selector", e);
}
// DISABLE_KEYSET_OPTIMIZATION: 是否優(yōu)化選擇器key集合,默認(rèn)為不優(yōu)化
if (DISABLE_KEYSET_OPTIMIZATION) {
return new SelectorTuple(unwrappedSelector);
}
// 執(zhí)行到此,說明需要優(yōu)化選擇器集合,首先創(chuàng)建一個(gè)選擇器集合
final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
// 然后通過反射找到SelectorImpl對(duì)象
Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction() {
@Override
public Object run() {
try {
// 通過反射獲取SelectorImpl實(shí)現(xiàn)類對(duì)象
return Class.forName(
"sun.nio.ch.SelectorImpl",
false,
PlatformDependent.getSystemClassLoader());
} catch (Throwable cause) {
return cause;
}
}
});
if (!(maybeSelectorImplClass instanceof Class) ||
// ensure the current selector implementation is what we can instrument.
!((Class>) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) {
if (maybeSelectorImplClass instanceof Throwable) {
Throwable t = (Throwable) maybeSelectorImplClass;
logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t);
}
return new SelectorTuple(unwrappedSelector);
}
final Class> selectorImplClass = (Class>) maybeSelectorImplClass;
// 以下run方法的主要目的就是將我們自己創(chuàng)建的selectedKeySet選擇器集合通過反射替換底層自帶的選擇器集合
Object maybeException = AccessController.doPrivileged(new PrivilegedAction() {
@Override
public Object run() {
try {
Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
if (cause != null) {
return cause;
}
cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
if (cause != null) {
return cause;
}
selectedKeysField.set(unwrappedSelector, selectedKeySet);
publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
return null;
} catch (NoSuchFieldException e) {
return e;
} catch (IllegalAccessException e) {
return e;
}
}
});
if (maybeException instanceof Exception) {
selectedKeys = null;
Exception e = (Exception) maybeException;
logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, e);
return new SelectorTuple(unwrappedSelector);
}
// 反射執(zhí)行完后,則將創(chuàng)建的selectedKeySet賦值為當(dāng)成員變量
selectedKeys = selectedKeySet;
logger.trace("instrumented a special java.util.Set into: {}", unwrappedSelector);
return new SelectorTuple(unwrappedSelector,
new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
}
2、其實(shí)說獲得多路復(fù)用器,倒不如說多路復(fù)用器從何而來,是通過provider調(diào)用provider.openSelector()方法而獲得的;
3、而這個(gè)provider所產(chǎn)生的地方其內(nèi)部是一個(gè)靜態(tài)變量,細(xì)心的童鞋會(huì)發(fā)現(xiàn)SelectorProvider.provider()這個(gè)里面還真有一個(gè)靜態(tài)provider;
4、而這里給用戶做了一個(gè)選擇是否需要優(yōu)化選擇器,如果需要優(yōu)化則用自己創(chuàng)建的選擇器通過反射塞到底層的多路復(fù)用器對(duì)象中;
4.5、線程選擇器
1、源碼:
// MultithreadEventExecutorGroup.java
// 實(shí)例化選擇線程器,也就是說我們要想執(zhí)行任務(wù),對(duì)于nThreads個(gè)線程,我們得靠一個(gè)規(guī)則來如何選取哪個(gè)具體線程來執(zhí)行任務(wù);
// 那么chooser就是來干這個(gè)事情的,它主要是幫我們選出需要執(zhí)行任務(wù)的線程封裝對(duì)象NioEventLoop
chooser = chooserFactory.newChooser(children);
// DefaultEventExecutorChooserFactory.java
@SuppressWarnings("unchecked")
@Override
public EventExecutorChooser newChooser(EventExecutor[] executors) {
if (isPowerOfTwo(executors.length)) {
return new PowerOfTwoEventExecutorChooser(executors);
} else {
return new GenericEventExecutorChooser(executors);
}
}
// PowerOfTwoEventExecutorChooser.java
private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
private final AtomicInteger idx = new AtomicInteger();
private final EventExecutor[] executors;
PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
this.executors = executors;
}
@Override
public EventExecutor next() {
return executors[idx.getAndIncrement() & executors.length - 1];
}
}
// GenericEventExecutorChooser.java
private static final class GenericEventExecutorChooser implements EventExecutorChooser {
private final AtomicInteger idx = new AtomicInteger();
private final EventExecutor[] executors;
GenericEventExecutorChooser(EventExecutor[] executors) {
this.executors = executors;
}
@Override
public EventExecutor next() {
return executors[Math.abs(idx.getAndIncrement() % executors.length)];
}
}
2、記得在前面說過,在實(shí)例化線程組Group的時(shí)候,會(huì)實(shí)例化一個(gè)線程選擇器,而這個(gè)選擇器的實(shí)現(xiàn)方式也正是由通過線程數(shù)量來決定的;
3、PowerOfTwoEventExecutorChooser與GenericEventExecutorChooser的主要區(qū)別就是,當(dāng)線程個(gè)數(shù)為2的n次方的話,那么則用PowerOfTwoEventExecutorChooser實(shí)例化的選擇器;
4、因?yàn)镋ventExecutorChooser的next()方法,一個(gè)是與操作,一個(gè)是求余操作,而與操作的效率稍微高些,所以在選擇線程這個(gè)細(xì)小的差別,netty的開發(fā)人員也真實(shí)一絲不茍的處理;