摘要:輔助功能類,提供接口向消息池中發(fā)送各類消息事件,并且提供響應消息的機制。進入消息泵循環(huán)體,以阻塞的方式獲取待處理消息。執(zhí)行消息的派發(fā)。并且將返回值保存在了。我們深入下去看看層的部分,在這里明顯生成了一個新的,并且將地址作為返回值返回了。
概述
android里的消息機制是非常重要的部分,這次我希望能夠系統的剖析這個部分,作為一個總結。
首先這里涉及到幾個部分,從層次上看,分為java層和native層2部分;從類上看,分為Handler/Looper/Message/MessageQueue。
Handler:輔助功能類,提供接口向消息池中發(fā)送各類消息事件,并且提供響應消息的機制。
Looper:消息泵,不斷的循環(huán)處理消息隊列中的每個消息,確保最終分發(fā)給處理者。
Message:消息體,承載消息內容。
MessageQueue:消息隊列,提供消息池和緩存。
結構關系他們之間的關系就是Looper使用MessageQueue提供機制,Handler提供調用接口和回調處理,Message作為載體數據傳遞。
我們下面逐次剖析。
一. Looperprepare --- 準備和初始化
這里的準備過程分為2個接口,分別是prepare和prepareMainLooper。區(qū)別是前者給線程提供,后者是給主UI線程調用。我們看下代碼來確認區(qū)別:
public static void prepare() { prepare(true); } public static void prepareMainLooper() { prepare(false); synchronized (Looper.class) { if (sMainLooper != null) { throw new IllegalStateException("The main Looper has already been prepared."); } sMainLooper = myLooper(); } }
首先調用內部帶參數的靜態(tài)方法prepare給的參數不同,true表示可以退出此looper,false表示不允許退出。prepareMainLooper中將這個looper對象賦值給了一個靜態(tài)私有變量sMainLooper保存下來。往下看帶參數的prepare:
private static void prepare(boolean quitAllowed) { if (sThreadLocal.get() != null) { throw new RuntimeException("Only one Looper may be created per thread"); } sThreadLocal.set(new Looper(quitAllowed)); }
首先是個tls的獲取和設置,這里做了判定,一個線程只能有一個Looper對象,然后創(chuàng)建的Looper設置在tls對象中。再來就是構造了:
private Looper(boolean quitAllowed) { mQueue = new MessageQueue(quitAllowed); mThread = Thread.currentThread(); }
創(chuàng)建了一個MessageQueue對象保存下來,然后將當前的線程對象保留下來。
至此準備過程完畢,總結一下,2個入口,不同的場景。tls對象的使用并確保每個線程都有唯一的一個Looper對象。
loop --- 消息循環(huán)
這個才是核心部分,循環(huán)進行消息的獲取及派發(fā)工作。我們直接上代碼:
public static void loop() { // 獲取tls的唯一Looper final Looper me = myLooper(); if (me == null) { throw new RuntimeException("No Looper; Looper.prepare() wasn"t called on this thread."); } // 獲取Looper中的消息隊列 final MessageQueue queue = me.mQueue; // Make sure the identity of this thread is that of the local process, // and keep track of what that identity token actually is. Binder.clearCallingIdentity(); final long ident = Binder.clearCallingIdentity(); // 進入消息泵循環(huán)體 for (;;) { // 獲取一個待處理的消息,有可能會阻塞,后面分析MessageQueue的時候回闡述 Message msg = queue.next(); // might block if (msg == null) { // No message indicates that the message queue is quitting. return; } // This must be in a local variable, in case a UI event sets the logger final Printer logging = me.mLogging; if (logging != null) { logging.println(">>>>> Dispatching to " + msg.target + " " + msg.callback + ": " + msg.what); } // 跟蹤消息 final long traceTag = me.mTraceTag; if (traceTag != 0 && Trace.isTagEnabled(traceTag)) { Trace.traceBegin(traceTag, msg.target.getTraceName(msg)); } try { // 處理消息的派發(fā) msg.target.dispatchMessage(msg); } finally { // 結束跟蹤 if (traceTag != 0) { Trace.traceEnd(traceTag); } } if (logging != null) { logging.println("<<<<< Finished to " + msg.target + " " + msg.callback); } // Make sure that during the course of dispatching the // identity of the thread wasn"t corrupted. final long newIdent = Binder.clearCallingIdentity(); if (ident != newIdent) { Log.wtf(TAG, "Thread identity changed from 0x" + Long.toHexString(ident) + " to 0x" + Long.toHexString(newIdent) + " while dispatching to " + msg.target.getClass().getName() + " " + msg.callback + " what=" + msg.what); } // 回收處理后的消息,將其放入消息池中,準備復用 msg.recycleUnchecked(); } }
過程如下:
1.獲取線程tls對象,那個唯一的looper,然后獲取MessageQueue消息隊列。
2.進入消息泵循環(huán)體,以阻塞的方式獲取待處理消息。
3.執(zhí)行消息的派發(fā)。
4.回收處理后的消息,放入消息池中,等待復用。
5.回頭2,開始下一個。
這里可以看出,大多數都是調用MessageQueue或者Message或者Handler來處理具體事務,loop這里只是個邏輯流程處理,很簡單。其實無論什么平臺下的消息機制大體都是這種流程。其中注意的是,處理消息派發(fā)的部分:msg.target.dispatchMessage(msg)。這個調用其實走的是message里面保留的handler的dispatchMessage,后面具體講到handler的時候會闡述。
quit --- 退出消息泵
退出過程比較簡單:
public void quit() { mQueue.quit(false); } public void quitSafely() { mQueue.quit(true); }
有2個調用,分別是正常退出和安全退出。區(qū)別是前者移除所有消息,后者是只移除尚未處理的消息。具體的在MessageQueue中闡述。
二. Handlerhandler我們最普通的用法就是new出來之后,重載handleMessage方法,來等待消息觸發(fā)并在這里寫下處理。之后無非就是在合適的時候調用sendMessage發(fā)送消息了。
Handler --- 構造
光是構造函數就有好多個,最后無非就2個入口:
public Handler(Callback callback, boolean async) { if (FIND_POTENTIAL_LEAKS) { final Class extends Handler> klass = getClass(); if ((klass.isAnonymousClass() || klass.isMemberClass() || klass.isLocalClass()) && (klass.getModifiers() & Modifier.STATIC) == 0) { Log.w(TAG, "The following Handler class should be static or leaks might occur: " + klass.getCanonicalName()); } } mLooper = Looper.myLooper(); if (mLooper == null) { throw new RuntimeException( "Can"t create handler inside thread that has not called Looper.prepare()"); } mQueue = mLooper.mQueue; mCallback = callback; mAsynchronous = async; } public Handler(Looper looper, Callback callback, boolean async) { mLooper = looper; mQueue = looper.mQueue; mCallback = callback; mAsynchronous = async; }
仔細看,其實都是在根據參數設置環(huán)境,共3個參數,looper、callback、async。looper是要綁定的looper就是說這個handler是要執(zhí)行在哪個線程的looper上的,一般我們使用的時候都是不指定,那么默認就是當前線程,這里是允許在其他任意線程的;callback是一個相應消息的回調,后面說;async表示是否異步執(zhí)行,關系到callback或者其他的處理消息體的執(zhí)行方式。在2個參數的構造中,Looper.myLooper();指定了是本線程的looper。
dispatchMessage --- 消息派發(fā)
還記得上面的looper的loop調用中,處理具體message的派發(fā)使用的是msg.target.dispatchMessage(msg)。這個target就是handler。那么我們直接看dispatchMessage,相關代碼如下:
public interface Callback { public boolean handleMessage(Message msg); } public void dispatchMessage(Message msg) { if (msg.callback != null) { handleCallback(msg); } else { if (mCallback != null) { if (mCallback.handleMessage(msg)) { return; } } handleMessage(msg); } } private static void handleCallback(Message message) { message.callback.run(); }
1.首先判斷message的callback是否存在,如果存在,調用這個callback,注意,查看message源碼可知,callback是個runnable。
2.否則,構造中保存的callback是否存在,如果存在,調用他的handleMessage方法。
3.如果上面2個條件都不滿足,調用自身的handleMessage。這個可以復寫。
我們能夠知道什么?
3個回調,分別是message的runnable,handler構造的callback,handler的自身handleMessage。這3個的調用是排他性的,一旦一個滿足,就直接返回,不再走別的。
sendMessage --- 發(fā)送消息
發(fā)送消息的過程本身有2個調用,一個是sendMessageXXX,一個是post。前者最終都會調用到sendMessageAtTime:
public final boolean sendMessageDelayed(Message msg, long delayMillis) { if (delayMillis < 0) { delayMillis = 0; } return sendMessageAtTime(msg, SystemClock.uptimeMillis() + delayMillis); } public boolean sendMessageAtTime(Message msg, long uptimeMillis) { MessageQueue queue = mQueue; if (queue == null) { RuntimeException e = new RuntimeException( this + " sendMessageAtTime() called with no mQueue"); Log.w("Looper", e.getMessage(), e); return false; } return enqueueMessage(queue, msg, uptimeMillis); } private boolean enqueueMessage(MessageQueue queue, Message msg, long uptimeMillis) { msg.target = this; if (mAsynchronous) { msg.setAsynchronous(true); } return queue.enqueueMessage(msg, uptimeMillis); }
其實最后走的都是MessageQueue的enqueueMessage。具體的我們在后面介紹MessageQueue的時候闡述。需要注意的是mAsynchronous,這個是否異步的標記,設置在了message里面。還有就是delayed的發(fā)送,其實是本地的系統當前時間加上延遲的時間差。
再來看看post:
public final boolean post(Runnable r) { return sendMessageDelayed(getPostMessage(r), 0); } private static Message getPostMessage(Runnable r) { Message m = Message.obtain(); m.callback = r; return m; }
最后也是走的sendmessage,但是區(qū)別是如果是Post調用,會將傳遞進來的runnable設置到message的callback中。
三. Message既然message是載體,那么先來看看數據內容:
// 消息的唯一key public int what; // 消息支持的2個參數,都是int類型 public int arg1; public int arg2; // 消息內容 public Object obj; // 這個是一個應答的信使,其實是和信使服務有關系的一個東西,這里暫時不做解釋 public Messenger replyTo; // 消息觸發(fā)的時間 /*package*/ long when; // 消息相應的handler /*package*/ Handler target; // 消息回調 /*package*/ Runnable callback; // 本消息的下一個 /*package*/ Message next; // 消息池,其實就是第一個消息 private static Message sPool; // 消息池當前的大小 private static int sPoolSize = 0;
obtain --- 獲取消息
public static Message obtain() { synchronized (sPoolSync) { if (sPool != null) { Message m = sPool; sPool = m.next; m.next = null; m.flags = 0; // clear in-use flag sPoolSize--; return m; } } return new Message(); }
這個sPool是一個靜態(tài)私有的變量,存儲的就是一個鏈表性質的表頭元素message。取出鏈表頭的元素,將鏈表表頭往后移動一個元素??梢钥闯鲞@個sPool表頭對應的鏈表就是一個回收后可復用的所有的message的集合,由于是靜態(tài)私有的,因此這里相當于一個全局的存在。
再看下回收就會比較清楚是如何將廢棄的message存儲的。
recycle --- 回收消息
public void recycle() { if (isInUse()) { if (gCheckRecycle) { throw new IllegalStateException("This message cannot be recycled because it " + "is still in use."); } return; } recycleUnchecked(); } void recycleUnchecked() { // Mark the message as in use while it remains in the recycled object pool. // Clear out all other details. flags = FLAG_IN_USE; what = 0; arg1 = 0; arg2 = 0; obj = null; replyTo = null; sendingUid = -1; when = 0; target = null; callback = null; data = null; synchronized (sPoolSync) { if (sPoolSize < MAX_POOL_SIZE) { next = sPool; sPool = this; sPoolSize++; } } }
在recycleUnchecked中就是修改自身message的成員,將其清空,然后判斷如果沒有超過這個鏈表的最大上限,則將這個message自身存儲為sPool,就是作為表頭了,然后再將pool的size增1。
從以上可以看到,message的復用機制是獨立的,與消息隊列并不直接關系,耦合性較低。
四. MessageQueueMessageQueue里會涉及到c層,也就是native層的內容,其實他大部分核心內容都是在c層完成的。java層是個銜接部分。
構造
MessageQueue的構造是在Looper的構造中完成的,也就是說一個線程有一個looper一個MessageQueue。
MessageQueue(boolean quitAllowed) { mQuitAllowed = quitAllowed; mPtr = nativeInit(); }
構造里面直接走了nativeInit。并且將返回值保存在了mPtr。我們深入下去看看c層的部分,在frameworks/base/core/jni/android_os_MessageQueue.cpp:
static jlong android_os_MessageQueue_nativeInit(JNIEnv* env, jclass clazz) { NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue(); if (!nativeMessageQueue) { jniThrowRuntimeException(env, "Unable to allocate native queue"); return 0; } nativeMessageQueue->incStrong(env); return reinterpret_cast(nativeMessageQueue); }
這里明顯生成了一個新的NativeMessageQueue,并且將地址作為返回值返回了。這個NativeMessageQueue就是個c層的queue對象。
獲取隊列消息 --- next
回到java層,我們看下next這個至關重要的函數在做什么,在looper的loop中,循環(huán)中第一句就是調用他獲取一個message:
Message next() { // 拿到初始化時候保存的地址,即是c層NativeMessageQueue對象的地址 final long ptr = mPtr; if (ptr == 0) { return null; } int pendingIdleHandlerCount = -1; // -1 only during first iteration int nextPollTimeoutMillis = 0; // 進入循環(huán),為了獲取到消息 for (;;) { if (nextPollTimeoutMillis != 0) { Binder.flushPendingCommands(); } // 阻塞,有超時 nativePollOnce(ptr, nextPollTimeoutMillis); synchronized (this) { // Try to retrieve the next message. Return if found. final long now = SystemClock.uptimeMillis(); Message prevMsg = null; Message msg = mMessages; // 當消息的handler為Null,找下一個異步的消息 if (msg != null && msg.target == null) { // Stalled by a barrier. Find the next asynchronous message in the queue. do { prevMsg = msg; msg = msg.next; } while (msg != null && !msg.isAsynchronous()); } if (msg != null) { if (now < msg.when) { // Next message is not ready. Set a timeout to wake up when it is ready. // 如果消息的觸發(fā)時間大于當前時鐘,則設置下一次阻塞等待超時為這個差值 nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE); } else { // 得到并返回一個message,這里是個鏈表操作 mBlocked = false; if (prevMsg != null) { prevMsg.next = msg.next; } else { mMessages = msg.next; } msg.next = null; if (DEBUG) Log.v(TAG, "Returning message: " + msg); msg.markInUse(); return msg; } } else { // No more messages. nextPollTimeoutMillis = -1; } // 退出情況的判斷 if (mQuitting) { dispose(); return null; } // 空閑時候的idlerHandler處理 if (pendingIdleHandlerCount < 0 && (mMessages == null || now < mMessages.when)) { pendingIdleHandlerCount = mIdleHandlers.size(); } if (pendingIdleHandlerCount <= 0) { // No idle handlers to run. Loop and wait some more. mBlocked = true; continue; } if (mPendingIdleHandlers == null) { mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount, 4)]; } mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers); } // Run the idle handlers. // We only ever reach this code block during the first iteration. for (int i = 0; i < pendingIdleHandlerCount; i++) { final IdleHandler idler = mPendingIdleHandlers[i]; mPendingIdleHandlers[i] = null; // release the reference to the handler boolean keep = false; try { keep = idler.queueIdle(); } catch (Throwable t) { Log.wtf(TAG, "IdleHandler threw exception", t); } if (!keep) { synchronized (this) { mIdleHandlers.remove(idler); } } } // Reset the idle handler count to 0 so we do not run them again. pendingIdleHandlerCount = 0; // While calling an idle handler, a new message could have been delivered // so go back and look again for a pending message without waiting. nextPollTimeoutMillis = 0; } }
1.通過之前初始化時保留的NativeMessageQueue阻塞獲取消息;
2.如果不是立即執(zhí)行的消息,并且沒有到達執(zhí)行點,根據該消息與當前時鐘的差值動態(tài)調節(jié)下一次阻塞獲取的超時時間;
3.如果到達執(zhí)行點的消息,操作鏈表,并返回該消息;
4.如果沒有消息可供處理,執(zhí)行所有之前注冊的IdleHandler;
往下看的話就是這個阻塞獲取消息的nativePollOnce了,繼續(xù)。
上面這個函數需要進入c層:
static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj, jlong ptr, jint timeoutMillis) { NativeMessageQueue* nativeMessageQueue = reinterpret_cast(ptr); nativeMessageQueue->pollOnce(env, obj, timeoutMillis); }
這里進入了NativeMessageQueue,為了了解c層的具體情況,我們需要分析下初始化過程。
五. c層運轉初始化
首先還是需要看看NativeMessageQueue類的初始化:
NativeMessageQueue::NativeMessageQueue() : mPollEnv(NULL), mPollObj(NULL), mExceptionObj(NULL) { mLooper = Looper::getForThread(); if (mLooper == NULL) { mLooper = new Looper(false); Looper::setForThread(mLooper); } }
這里又有一個Looper,注意,這里的已經不是java層的那個了,而是c層自身的Looper,在/system/core/libutils/Looper.cpp這里,還是老規(guī)矩,看看他初始化的時候:
Looper::Looper(bool allowNonCallbacks) : mAllowNonCallbacks(allowNonCallbacks), mSendingMessage(false), mPolling(false), mEpollFd(-1), mEpollRebuildRequired(false), mNextRequestSeq(0), mResponseIndex(0), mNextMessageUptime(LLONG_MAX) { mWakeEventFd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); LOG_ALWAYS_FATAL_IF(mWakeEventFd < 0, "Could not make wake event fd: %s", strerror(errno)); AutoMutex _l(mLock); rebuildEpollLocked(); } void Looper::rebuildEpollLocked() { // Close old epoll instance if we have one. if (mEpollFd >= 0) { #if DEBUG_CALLBACKS ALOGD("%p ~ rebuildEpollLocked - rebuilding epoll set", this); #endif close(mEpollFd); } // Allocate the new epoll instance and register the wake pipe. mEpollFd = epoll_create(EPOLL_SIZE_HINT); LOG_ALWAYS_FATAL_IF(mEpollFd < 0, "Could not create epoll instance: %s", strerror(errno)); struct epoll_event eventItem; memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union eventItem.events = EPOLLIN; eventItem.data.fd = mWakeEventFd; int result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeEventFd, & eventItem); LOG_ALWAYS_FATAL_IF(result != 0, "Could not add wake event fd to epoll instance: %s", strerror(errno)); for (size_t i = 0; i < mRequests.size(); i++) { const Request& request = mRequests.valueAt(i); struct epoll_event eventItem; request.initEventItem(&eventItem); int epollResult = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, request.fd, & eventItem); if (epollResult < 0) { ALOGE("Error adding epoll events for fd %d while rebuilding epoll set: %s", request.fd, strerror(errno)); } } }
看到了吧,使用了epoll來監(jiān)控多個fd。首先是一個喚醒的事件fd,然后是根據request隊列的每個request來添加不同的監(jiān)控fd。request是什么呢?我們暫時先放一下,后面會闡述。
總結一下初始化過程:
讀取消息 --- nativePollOnce
static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj, jlong ptr, jint timeoutMillis) { NativeMessageQueue* nativeMessageQueue = reinterpret_cast(ptr); nativeMessageQueue->pollOnce(env, obj, timeoutMillis); } void NativeMessageQueue::pollOnce(JNIEnv* env, jobject pollObj, int timeoutMillis) { mPollEnv = env; mPollObj = pollObj; mLooper->pollOnce(timeoutMillis); mPollObj = NULL; mPollEnv = NULL; if (mExceptionObj) { env->Throw(mExceptionObj); env->DeleteLocalRef(mExceptionObj); mExceptionObj = NULL; } }
從這里可以看到,其實是通過c層的Looper調用pollOnce來完成的。
int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) { int result = 0; for (;;) { // 處理每個response里的request,如果沒有回調,直接返回 while (mResponseIndex < mResponses.size()) { const Response& response = mResponses.itemAt(mResponseIndex++); int ident = response.request.ident; if (ident >= 0) { int fd = response.request.fd; int events = response.events; void* data = response.request.data; #if DEBUG_POLL_AND_WAKE ALOGD("%p ~ pollOnce - returning signalled identifier %d: " "fd=%d, events=0x%x, data=%p", this, ident, fd, events, data); #endif if (outFd != NULL) *outFd = fd; if (outEvents != NULL) *outEvents = events; if (outData != NULL) *outData = data; return ident; } } if (result != 0) { #if DEBUG_POLL_AND_WAKE ALOGD("%p ~ pollOnce - returning result %d", this, result); #endif if (outFd != NULL) *outFd = 0; if (outEvents != NULL) *outEvents = 0; if (outData != NULL) *outData = NULL; return result; } result = pollInner(timeoutMillis); } }
重點就一個:pollInner:
...... // Poll. int result = POLL_WAKE; mResponses.clear(); mResponseIndex = 0; // We are about to idle. mPolling = true; // 最大處理16個fd struct epoll_event eventItems[EPOLL_MAX_EVENTS]; 等待事件發(fā)生或超時 int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis); // No longer idling. mPolling = false; // Acquire lock. mLock.lock(); // 如果需要進行重建epoll if (mEpollRebuildRequired) { mEpollRebuildRequired = false; rebuildEpollLocked(); goto Done; } // <0錯誤處理,直接跳轉到Done if (eventCount < 0) { if (errno == EINTR) { goto Done; } ALOGW("Poll failed with an unexpected error: %s", strerror(errno)); result = POLL_ERROR; goto Done; } // 超時,跳轉到Done if (eventCount == 0) { #if DEBUG_POLL_AND_WAKE ALOGD("%p ~ pollOnce - timeout", this); #endif result = POLL_TIMEOUT; goto Done; } ...... // 循環(huán)處理獲取到的event for (int i = 0; i < eventCount; i++) { int fd = eventItems[i].data.fd; uint32_t epollEvents = eventItems[i].events; if (fd == mWakeEventFd) { // 如果是喚醒的fd,執(zhí)行喚醒處理 if (epollEvents & EPOLLIN) { awoken(); } else { ALOGW("Ignoring unexpected epoll events 0x%x on wake event fd.", epollEvents); } } else { // 否則,處理每個request ssize_t requestIndex = mRequests.indexOfKey(fd); if (requestIndex >= 0) { // 創(chuàng)建新的events,并通過pushResponse生成新的response,push int events = 0; if (epollEvents & EPOLLIN) events |= EVENT_INPUT; if (epollEvents & EPOLLOUT) events |= EVENT_OUTPUT; if (epollEvents & EPOLLERR) events |= EVENT_ERROR; if (epollEvents & EPOLLHUP) events |= EVENT_HANGUP; pushResponse(events, mRequests.valueAt(requestIndex)); } else { ALOGW("Ignoring unexpected epoll events 0x%x on fd %d that is " "no longer registered.", epollEvents, fd); } } } Done: ; // Invoke pending message callbacks. mNextMessageUptime = LLONG_MAX; // 處理堆積未處理的事件 while (mMessageEnvelopes.size() != 0) { nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC); const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0); if (messageEnvelope.uptime <= now) { // Remove the envelope from the list. // We keep a strong reference to the handler until the call to handleMessage // finishes. Then we drop it so that the handler can be deleted *before* // we reacquire our lock. { // obtain handler sphandler = messageEnvelope.handler; Message message = messageEnvelope.message; mMessageEnvelopes.removeAt(0); mSendingMessage = true; mLock.unlock(); #if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS ALOGD("%p ~ pollOnce - sending message: handler=%p, what=%d", this, handler.get(), message.what); #endif handler->handleMessage(message); } // release handler mLock.lock(); mSendingMessage = false; result = POLL_CALLBACK; } else { // The last message left at the head of the queue determines the next wakeup time. mNextMessageUptime = messageEnvelope.uptime; break; } } // Release lock. mLock.unlock(); // 處理每個response for (size_t i = 0; i < mResponses.size(); i++) { Response& response = mResponses.editItemAt(i); if (response.request.ident == POLL_CALLBACK) { int fd = response.request.fd; int events = response.events; void* data = response.request.data; #if DEBUG_POLL_AND_WAKE || DEBUG_CALLBACKS ALOGD("%p ~ pollOnce - invoking fd event callback %p: fd=%d, events=0x%x, data=%p", this, response.request.callback.get(), fd, events, data); #endif // Invoke the callback. Note that the file descriptor may be closed by // the callback (and potentially even reused) before the function returns so // we need to be a little careful when removing the file descriptor afterwards. int callbackResult = response.request.callback->handleEvent(fd, events, data); if (callbackResult == 0) { removeFd(fd, response.request.seq); } // Clear the callback reference in the response structure promptly because we // will not clear the response vector itself until the next poll. response.request.callback.clear(); result = POLL_CALLBACK; } } return result;
總結一下:
1.通過epoll_wait執(zhí)行等待事件的操作;
2.根據等待到的event與request數組,生成response并push;
3.循環(huán)處理堆積的未處理的mMessageEnvelopes事件;
4.處理所有response;
這里引出3個東西,request/response/mMessageEnvelopes。我們分別解釋下。
首先,request的add動作是在addFd時候調用的,因此這里應該是將fd與關心的event綁定的東西。一個fd可綁定多個事件,通過|操作符。后面每次收到event后,使用&來判斷是否存在關心的事件,如果是執(zhí)行pushResponse。
response就更簡單了,就是一個events與request的對應關系的維護:
struct Request { int fd; int ident; int events; int seq; spcallback; void* data; void initEventItem(struct epoll_event* eventItem) const; }; struct Response { int events; Request request; };
mRequests是個以fd作為索引的vector,mResponses就干脆就是個vector。
mMessageEnvelopes是個vector,存儲的是MessageEnvelope對象:
struct MessageEnvelope { MessageEnvelope() : uptime(0) { } MessageEnvelope(nsecs_t u, const sph, const Message& m) : uptime(u), handler(h), message(m) { } nsecs_t uptime; sp handler; Message message; };
在sendMessageAtTime里生成并insertAt了MessageEnvelope。因此可以看出,MessageEnvelope其實就是緩存了需要處理的message,并記錄了需要執(zhí)行的時間uptime和handler,及消息體message。
處理消息 --- handler的調用
處理消息的過程其實在上面已經表明了,就是調用response.request.callback->handleEvent(fd,?events,?data);這句話,那么來看看這個callback是怎么回事:
class?LooperCallback?:?public?virtual?RefBase?{ protected: ????virtual?~LooperCallback(); public: ????/** ?????*?Handles?a?poll?event?for?the?given?file?descriptor. ?????*?It?is?given?the?file?descriptor?it?is?associated?with, ?????*?a?bitmask?of?the?poll?events?that?were?triggered?(typically?EVENT_INPUT), ?????*?and?the?data?pointer?that?was?originally?supplied. ?????* ?????*?Implementations?should?return?1?to?continue?receiving?callbacks,?or?0 ?????*?to?have?this?file?descriptor?and?callback?unregistered?from?the?looper. ?????*/ ????virtual?int?handleEvent(int?fd,?int?events,?void*?data)?=?0; };
就是個回調的對象,在addFd的時候需要傳遞進去。在setFileDescriptorEvents的時候調用了addFd,給定的是this,也就是說,回調響應由NativeMessageQueue自行截獲。順便說下,這個setFileDescriptorEvents最后還是提供給Java層調用的,對應的是nativeSetFileDescriptorEvents函數。
好吧,我們回來,既然調用的是handleEvent,那么我們就看看這個東西:
int?NativeMessageQueue::handleEvent(int?fd,?int?looperEvents,?void*?data)?{ ????int?events?=?0; ????if?(looperEvents?&?Looper::EVENT_INPUT)?{ ????????events?|=?CALLBACK_EVENT_INPUT; ????} ????if?(looperEvents?&?Looper::EVENT_OUTPUT)?{ ????????events?|=?CALLBACK_EVENT_OUTPUT; ????} ????if?(looperEvents?&?(Looper::EVENT_ERROR?|?Looper::EVENT_HANGUP?|?Looper::EVENT_INVALID))?{ ????????events?|=?CALLBACK_EVENT_ERROR; ????} ????int?oldWatchedEvents?=?reinterpret_cast(data); ????int?newWatchedEvents?=?mPollEnv->CallIntMethod(mPollObj, ????????????gMessageQueueClassInfo.dispatchEvents,?fd,?events); ????if?(!newWatchedEvents)?{ ????????return?0;?//?unregister?the?fd ????} ????if?(newWatchedEvents?!=?oldWatchedEvents)?{ ????????setFileDescriptorEvents(fd,?newWatchedEvents); ????} ????return?1; }
組合event后,調用的是mPollEnv->CallIntMethod(mPollObj,
????????????gMessageQueueClassInfo.dispatchEvents,?fd,?events);能看出來吧,mPollEnv是JNIEnv,那么這個明顯是調用java層的方法,是誰呢?就是MessageQueue.dispatchEvents:
private?int?dispatchEvents(int?fd,?int?events)?{ ????????//?Get?the?file?descriptor?record?and?any?state?that?might?change. ????????final?FileDescriptorRecord?record; ????????final?int?oldWatchedEvents; ????????final?OnFileDescriptorEventListener?listener; ????????final?int?seq; ????????synchronized?(this)?{ ????????????record?=?mFileDescriptorRecords.get(fd); ????????????if?(record?==?null)?{ ????????????????return?0;?//?spurious,?no?listener?registered ????????????} ????????????oldWatchedEvents?=?record.mEvents; ????????????events?&=?oldWatchedEvents;?//?filter?events?based?on?current?watched?set ????????????if?(events?==?0)?{ ????????????????return?oldWatchedEvents;?//?spurious,?watched?events?changed ????????????} ????????????listener?=?record.mListener; ????????????seq?=?record.mSeq; ????????} ????????//?Invoke?the?listener?outside?of?the?lock. ????????int?newWatchedEvents?=?listener.onFileDescriptorEvents( ????????????????record.mDescriptor,?events); ????????if?(newWatchedEvents?!=?0)?{ ????????????newWatchedEvents?|=?OnFileDescriptorEventListener.EVENT_ERROR; ????????} ????????//?Update?the?file?descriptor?record?if?the?listener?changed?the?set?of ????????//?events?to?watch?and?the?listener?itself?hasn"t?been?updated?since. ????????if?(newWatchedEvents?!=?oldWatchedEvents)?{ ????????????synchronized?(this)?{ ????????????????int?index?=?mFileDescriptorRecords.indexOfKey(fd); ????????????????if?(index?>=?0?&&?mFileDescriptorRecords.valueAt(index)?==?record ????????????????????????&&?record.mSeq?==?seq)?{ ????????????????????record.mEvents?=?newWatchedEvents; ????????????????????if?(newWatchedEvents?==?0)?{ ????????????????????????mFileDescriptorRecords.removeAt(index); ????????????????????} ????????????????} ????????????} ????????} ????????//?Return?the?new?set?of?events?to?watch?for?native?code?to?take?care?of. ????????return?newWatchedEvents; ????}
其實最主要的就是調用了listener.onFileDescriptorEvents(
????????????????record.mDescriptor,?events);,其實就是調用之前設置好的監(jiān)聽者響應。是根據fd來選擇listener的。
我查了一下,調用addOnFileDescriptorEventListener的只有在java層的ParcelFileDescriptor.fromFd有這個動作,再深入查下去就是MountService.mountAppFuse來做這個事情。感覺是在mountApp的時候做的這個監(jiān)聽。
總之這個過程是要在有事件響應的時候根據事件的情況(EVENT_INPUT/EVENT_OUTPUT/EVENT_ERROR/EVENT_HANGUP/EVENT_INVALID)如果有這些情況,則需要通知對應的監(jiān)聽者進行響應,但是看情況跟message本身的處理關系就不大了。
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規(guī)行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://systransis.cn/yun/66816.html
摘要:在子線程中發(fā)送消息,主線程接受到消息并且處理邏輯。也稱之為消息隊列,特點是先進先出,底層實現是單鏈表數據結構得出結論方法初始話了一個對象并關聯在一個對象,并且一個線程中只有一個對象,只有一個對象。 目錄介紹 1.Handler的常見的使用方式 2.如何在子線程中定義Handler 3.主線程如何自動調用Looper.prepare() 4.Looper.prepare()方法源碼分析...
摘要:今天,我我的后端書架后端掘金我的后端書架月前本書架主要針對后端開發(fā)與架構。尤其是對稱加密,非對稱加密,私鑰加密,公鑰加密滴滴動態(tài)化方案的誕生與起航掘金這是滴滴架構組發(fā)布的第一篇公共技術文章,本文將介紹自研的動態(tài)化方案。 android 阿里面試題錦集 - Android - 掘金前幾天突然就經歷了阿里android實習內推的電面,感覺有好多以前看過的東西都忘記了,然后又復習了一下,找了...
摘要:閱讀本期周刊,你將快速入門,開啟甜蜜之旅。然則的原理負責發(fā)送以及處理消息,創(chuàng)建消息隊列并不斷從隊列中取出消息交給,則用于保存消息。 showImg(/img/bVCN99?w=900&h=385); 2016 年 8 月,Android 7.0 Nougat(牛軋?zhí)牵┱桨l(fā)布,那么問題來了,你 Marshmallow 了么(? -? ?) Cupcake、Donut、Gingerbre...
閱讀 3599·2023-04-26 02:55
閱讀 2866·2021-11-02 14:38
閱讀 4146·2021-10-21 09:39
閱讀 2856·2021-09-27 13:36
閱讀 3967·2021-09-22 15:08
閱讀 2657·2021-09-08 10:42
閱讀 2811·2019-08-29 12:21
閱讀 678·2019-08-29 11:22