Handler是我们非常熟悉的一个组件,它的主要作用就是进行线程间的交互,通常是主线程与其他工作线程间的交互。这套消息机制在应用开发中用的是最多的,我们使用它来实现切换主线程、发送延时消息等。它主要由HandlerLooperMessageQueue三个组件组成,其中Handler负责发送和处理消息,Looper负责循环读取消息,MessageQueue负责存储消息。

简单介绍

整套消息机制我们应该非常熟悉了,具体的使用就不再赘述,这里只简单介绍下各个组件的作用。本文基于Android 13源码。

Handler

Handler负责发送消息和处理消息,发送的消息主要是普通消息和Runnable消息,通常使用postpostAtTimepostDelayed发送Runnable消息,通过sendEmptyMessagesendMessagesendMessageAtTimesendMessageDelayed发送普通消息。

这些消息被发送时,都会在消息上指明target为当前Handler,而等到消息执行时,就会将消息指派给它对应的Handler进行处理。

Message

Message是消息的载体,对于普通消息,可以直接通过它的what属性进行区分,通过arg1arg2承载简单的数据,通过obj承载复杂的对象数据;对于Runnable消息,通过它的callback属性进行承载。同时,Message是被设计成单链表结构的,以及一个消息池,被用过的Message就会被放在消息池中等待复用,消息池就是一个单链表结构。因此如果我们创建Message的时候,最好通过Message.obtain方法创建而不是直接new以复用Message

Looper

Looper是一个消息机制中的驱动模块,它会循环读取消息队列中的消息,然后将其分发给对应的Handler进行处理。当消息队列为空或者没有可执行的消息时,它会阻塞当前线程,当有了可执行的消息时会被唤醒继续循环读取。

MessageQueue

消息队列MessageQueue,主要作用是存储消息,各个Handler发送的消息都会进入消息队列,并且按照消息的执行时间进行排序,时间靠前的排在前面。当Looper获取消息时,从前向后读取消息,如果消息的执行时间在当前时间之后,就会阻塞一直到执行时间后恢复。

Java层源码

我们知道Handler是可以跨线程进行交互的,那么如何跨线程呢?首先线程间的数据本身就是可以共享的,我们可以在某个线程中定义一个数据结构,然后在另一个线程中向这个数据结构中写入数据,这样两个线程就可以交换数据了。生产者消费者模式就是这个原理,而Handler消息机制,本质上也是这个原理。

Handler消息机制中,MessageQueue就是这个共享的数据结构,它存在于Looper中,也就是说如果我们想要在某个线程中启用Handler消息机制,则必须创建一个Looper

1
2
3
4
private Looper(boolean quitAllowed) {
mQueue = new MessageQueue(quitAllowed);
mThread = Thread.currentThread();
}

Looper的构造方法中会创建一个MessageQueue来接受数据,但是它是私有方法,无法直接创建,我们正常是通过prepare方法创建。

1
2
3
4
5
6
7
8
9
10
11
12
13
@UnsupportedAppUsage
static final ThreadLocal<Looper> sThreadLocal = new ThreadLocal<Looper>();

public static void prepare() {
prepare(true);
}

private static void prepare(boolean quitAllowed) {
if (sThreadLocal.get() != null) {
throw new RuntimeException("Only one Looper may be created per thread");
}
sThreadLocal.set(new Looper(quitAllowed));
}

prepare中可以看到,如果当前线程中已经创建过了Looper,则直接抛出异常,否则创建Looper并存入到ThreadLocal中,使得一个线程中只存在一个Looper,从而保障了一个线程只有一个MessageQueueThreadLocal是线程局部变量,它通常会被定义成静态变量供多个线程存储和获取变量的,本质上就是拿到线程的ThreadLocalMap,然后往里面存数据就行了,这个不需要过多的了解。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public static void loop() {
// 获取到当前线程对应的Looper
final Looper me = myLooper();
if (me == null) {
throw new RuntimeException("No Looper; Looper.prepare() wasn't called on this thread.");
}

me.mInLoop = true;

for (;;) {
if (!loopOnce(me, ident, thresholdOverride)) {
return;
}
}
}

public static @Nullable Looper myLooper() {
return sThreadLocal.get();
}

创建完Looper之后,就通过loop()去循环读取MessageQueue中的消息了。注意loop是个死循环,当前线程的逻辑会一直卡在这里无法再做别的事情了。因此,我们必须在prepareloop之间创建一个或多个Handler,然后将Handler提供出去以便其他线程向当前线程中发送消息,通常的写法如下:

1
2
3
4
5
6
7
8
9
10
11
12
class MyThread extends Thread {

private Handler mHandler;

@Override
public void run() {
super.run();
Looper.prepare();
mHandler = new Handler(Looper.myLooper());
Looper.loop();
}
}

但是主线程不是我们启动的,我们又该如何获取到Handler,如何创建Looper,然后向主线程发消息呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
@UnsupportedAppUsage
private static Looper sMainLooper;

@Deprecated
public static void prepareMainLooper() {
prepare(false);
synchronized (Looper.class) {
if (sMainLooper != null) {
throw new IllegalStateException("The main Looper has already been prepared.");
}
sMainLooper = myLooper();
}
}

主线程走的是另一个方法prepareMainLooper,当应用的主线程启动后,会调用prepareMainLooper创建Looper,并赋值给静态变量sMainLooper。因此对于应用而言,sMainLooper是不会为空的,我们也可以通过它来创建Handler

1
private Handler mMainHandler = new Handler(Looper.getMainLooper);

然后便是Handler了,由于Handler是用于发送消息的,所以Handler必须能够获取到Looper以便向它的MessageQueue中发送消息。因此,Handler构造方法中,必须传入Looper

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
// 空参数构造方法,不建议使用
@Deprecated
public Handler() {
this(null, false);
}

// 未提供Looper的构造方法,不建议使用
public Handler(@Nullable Callback callback, boolean async) {
...
// 没有传入looper,直接从当前线程中获取Looper
mLooper = Looper.myLooper();
// 当前线程没有启用Looper,直接抛异常
if (mLooper == null) {
throw new RuntimeException(
"Can't create handler inside thread " + Thread.currentThread()
\+ " that has not called Looper.prepare()");
}
mQueue = mLooper.mQueue;
mCallback = callback;
mAsynchronous = async;
mIsShared = false;
}

// 构造方法中传入Looper,推荐使用
public Handler(@NonNull Looper looper) {
this(looper, null, false);
}

// 不允许app使用
@UnsupportedAppUsage
public Handler(@NonNull Looper looper, @Nullable Callback callback, boolean async) {
this(looper, callback, async, /* shared= */ false);
}

// 不允许app使用
/** @hide */
public Handler(@NonNull Looper looper, @Nullable Callback callback, boolean async,
boolean shared) {
mLooper = looper;
mQueue = looper.mQueue;
mCallback = callback;
mAsynchronous = async;
mIsShared = shared;
}

对于空参数的构造方法,则直接从当前线程获取Looper,获取不到直接抛异常。当然,不带Looper的构造方法已经被标记为Deprecated了,再去掉其他的hide的方法等,我们实际使用中通常用的是public Handler(@NonNull Looper looper)这个构造方法。
Handler有很多的发送消息的方法,但最终都是走到了同一个方法中去发送数据。并且最终也是通过queue.enqueueMessage加入到消息队列中。

1
2
3
4
5
6
7
8
9
10
11
private boolean enqueueMessage(@NonNull MessageQueue queue, @NonNull Message msg,
long uptimeMillis) {
// 消息的target赋值为当前handler,最终message会交给target执行的handler执行
msg.target = this;
msg.workSourceUid = ThreadLocalWorkSource.getUid();

if (mAsynchronous) {
msg.setAsynchronous(true);
}
return queue.enqueueMessage(msg, uptimeMillis);
}

虽然Handler有很多的发送消息的方法,但最终也是调用了queue.enqueueMessage将消息传入到消息队列中。所以,它的这些方法只是为了方便我们使用而已,将一些参数进行拆分,方便我们快速编码。不管发送时传入的参数是什么,最终都会被构建成一个Message对象,这个Message就是消息的载体,它代表了一个消息。因为消息会有很多很多,为了避免频繁的创建和销毁,它被设计成一个单链表结构,并设计了一个消息池,消息池中存放的是用于重复利用的Message,他们以单链表的形式存在。因此,我们如果要直接发送Message的话,切记不要直接new,而是应该通过Message.obtain方法从消息池中获取,以达成重复利用的目的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public final class Message implements Parcelable {
// 用于区分消息类型,用户自定义
public int what;

// 消息可携带简单参数
public int arg1;
public int arg2;

// 消息可携带复杂对象
public Object obj;

// 标记当前消息是否正在被使用
@UnsupportedAppUsage
/*package*/ int flags;

// 消息具体的需要被执行的时间点,消息队列中以此参数排序
@UnsupportedAppUsage
@VisibleForTesting(visibility = VisibleForTesting.Visibility.PACKAGE)
public long when;

// 消息应该由哪个Handler处理执行
@UnsupportedAppUsage
/*package*/ Handler target;

// 当前消息是一个可执行的代码块逻辑
@UnsupportedAppUsage
/*package*/ Runnable callback;

// 单链表指针
@UnsupportedAppUsage
/*package*/ Message next;
}

消息的结构比较简单,一个用于区分消息的参数,三个用于携带数据的参数,以及一个用于存储可执行代码块的参数,这里不需要详细解释,接下来继续看消息进入消息队列的逻辑。注意这里的消息队列MessageQueue只是名字叫做消息队列,它的结构实际不是一个队列的数据结构,而是一个以消息执行时间排序的Message为节点的单链表结构。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
boolean enqueueMessage(Message msg, long when) {
// 必须有target,正常发送的消息都有
if (msg.target == null) {
throw new IllegalArgumentException("Message must have a target.");
}
synchronized (this) {
// message已经被用过了,没有被回收
if (msg.isInUse()) {
throw new IllegalStateException(msg + " This message is already in use.");
}
// 正在退出looper,直接回收msg
if (mQuitting) {
IllegalStateException e = new IllegalStateException(
msg.target + " sending message to a Handler on a dead thread");
Log.w(TAG, e.getMessage(), e);
msg.recycle();
return false;
}
// 标记为已使用的状态,并设置消息的执行时机
msg.markInUse();
msg.when = when;
Message p = mMessages;
// 是否需要唤醒Looper
boolean needWake;
if (p == null || when == 0 || when < p.when) {
// 队列中无数据,插入到表头
msg.next = p;
mMessages = msg;
needWake = mBlocked;
} else {
needWake = mBlocked && p.target == null && msg.isAsynchronous();
Message prev;
// 遍历消息队列,按时间顺序插入消息
for (;;) {
prev = p;
p = p.next;
if (p == null || when < p.when) {
// 需要执行的消息比当前消息早,则插入到它前面
break;
}
if (needWake && p.isAsynchronous()) {
needWake = false;
}
}
// 插入到消息的前面
msg.next = p;
prev.next = msg;
}
// 唤醒Looper
if (needWake) {
nativeWake(mPtr);
}
}
return true;
}

整体看下来,在消息入队列时做了两件事:一是按照Message.when的时间顺序将新的Message插入到消息队列中,一是决定是否唤醒Looper线程。如果消息队列中没有消息并且当前Looper线程是阻塞的,则唤醒Looper。另外如果消息队列中有消息,但是Looper是阻塞的并且当前的消息是异步消息,则也唤醒Looper

前面我们看到的MessageQueue插入数据的逻辑,实际上是发生在Handler.sendMessage所在的线程。因为我们开启Looper循环后,Looper所在的线程就会一直循环从MessageQueue中取数据,取不到时就会阻塞。因此,在MQ收到消息后,会选择是否唤起Looper来处理新来的消息。重新看回Looper.loop方法:

1
2
3
4
5
6
7
8
public static void loop() {
...
for (;;) {
if (!loopOnce(me, ident, thresholdOverride)) {
return;
}
}
}

也就是说,Looper开启后就会一直死循环读取,而读取的逻辑则是发生在loopOnce中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
private static boolean loopOnce(final Looper me,
final long ident, final int thresholdOverride) {
// 从消息队列中获取一个消息,可能会阻塞
Message msg = me.mQueue.next();
// 按照正常逻辑,msg不可能为空,因为获取不到消息时会阻塞,只有looper退出时才会返回null
// 当msg为空,也就代表着loop方法的结束,整个消息机制的结束。
if (msg == null) {
return false;
}

// 通过Looper#setMessageLogging可以设置日志打印类,可以打印出每个消息的执行情况,
// 我们可以通过这个记录每个Message的执行实际,执行耗时等信息,可用于监控线程是否卡顿
final Printer logging = me.mLogging;
if (logging != null) {
logging.println(">>>>> Dispatching to " + msg.target + " "
+ msg.callback + ": " + msg.what);
}
// 这个类才是监控消息执行的,在消息执行前、执行后、以及发生异常时,都会有回调到
// observer中,这个类可以帮助我们监控消息队列的执行情况,比前面的logging更好用。
// 但是,它是hide的,我们正常情况下无法使用
final Observer observer = sObserver;
...
if (observer != null) {
// 消息执行前的回调
token = observer.messageDispatchStarting();
}
try {
// 取出消息对应的handler,通过dispatchMessage处理消息
msg.target.dispatchMessage(msg);
// 消息执行后的回调
if (observer != null) {
observer.messageDispatched(token, msg);
}
} catch (Exception exception) {
// 异常时的回调
if (observer != null) {
observer.dispatchingThrewException(token, msg, exception);
}
throw exception;
} finally {
...
}
// 消息执行后的logging打印
if (logging != null) {
logging.println("<<<<< Finished to " + msg.target + " " + msg.callback);
}
// 回收消息,会将消息状态重置,并标记未使用,然后放回到消息池中等待复用
msg.recycleUnchecked();
// 返回true,因此在looper中会再次调用到loopOnce方法
return true;
}

前面的loopOnce从名字也能看出来是循环一次的意思,实际它的逻辑也是如此。首先通过MessageQueue.next取出一个消息,注意该消息一定不为空,因为取不到消息的时候会阻塞住,直到取到消息才会返回,当然如果Looper被退出的话是会返回null的,然后就是通过Message对应的targetdispatchMessage。从这里也能看到线程是如何切换的了,首先在其他线程通过Handler发送消息,这个消息会是一个数据或者一个代码块Runnable,然后被包装成Message,最终在Looper线程中通过Handler.dispatchMessage处理。

因此我们接下来的关注点有两点:一是MessageQueue.next是如何取消息的,一是Handler.dispatchMessage是如何处理消息的,先看next

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
Message next() {
// native的指针,用于阻塞和唤醒的
final long ptr = mPtr;
if (ptr == 0) {
return null;
}
// idleHandler的个数,用来控制避免重复执行的
int pendingIdleHandlerCount = -1;

// 阻塞时间,当没有消息时取值为-1,表示一直阻塞;有消息时但是不能执行,说明该消息的
// 执行时间是在未来的,因此取值为msg.when - now
int nextPollTimeoutMillis = 0;

// 这里也是一个死循环,说明如果取不到数据,是会继续循环去取,直到取到message为止,
// 因此说这个方法的返回值不会为null,除非Looper被quit才会返回null。
for (; ; ) {
// 阻塞一段时间nextPollTimeoutMillis,第一次的时候该值为0不会被阻塞,
// 后续就可能会被阻塞住,直到第一个消息的执行时间点到达
nativePollOnce(ptr, nextPollTimeoutMillis);

synchronized (this) {
final long now = SystemClock.uptimeMillis();
Message prevMsg = null;
Message msg = mMessages;
// target为空,说明msg是一个消息屏障
if (msg != null && msg.target == null) {
// 遇到消息屏障后,该屏障后的所有普通消息不再执行,但是异步消息还是会执行的,
// 因此这里使用循环查找消息屏障后的第一个异步消息返回
do {
prevMsg = msg;
msg = msg.next;
} while (msg != null && !msg.isAsynchronous());
}
if (msg != null) {
// 找到了消息,注意msg的初始值是mMessage,也就是消息队列的第一个值,因此
// 走到这里说明拿到了一个可能是正常消息,也可能是异步消息的消息
if (now < msg.when) {
// 消息的执行时间未到,则计算需要阻塞的时间
nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
} else {
// 找到了消息并且可以执行,标记为非阻塞状态
mBlocked = false;
// 将消息从消息队列中取出来
if (prevMsg != null) {
prevMsg.next = msg.next;
} else {
mMessages = msg.next;
}
msg.next = null;
msg.markInUse();
return msg;
}
} else {
// 未找到消息,则将阻塞时间设置为一直阻塞
nextPollTimeoutMillis = -1;
}
// 只有Looper.quit时,才会返回null
if (mQuitting) {
dispose();
return null;
}
...
}

// 执行pendingIntent
for (int i = 0; i < pendingIdleHandlerCount; i++) {
final IdleHandler idler = mPendingIdleHandlers[i];
mPendingIdleHandlers[i] = null; // release the reference to the handler
boolean keep = false;
try {
keep = idler.queueIdle();
} catch (Throwable t) {
Log.wtf(TAG, "IdleHandler threw exception", t);
}
if (!keep) {
synchronized (this) {
mIdleHandlers.remove(idler);
}
}
}
...
}
}

上面的逻辑比较长,详细的注释也在其中了。这里有一段逻辑是当Messagetarget为空时,后面只会去取isAsynchronous的异步消息,这里这种target为空的消息被称为消息屏障,它的作用就是屏蔽它之后的消息,但是无法屏蔽异步消息。整体逻辑就是:取第一个Message,如果是消息屏障的话,继续找它后面的异步消息,反正就是找到最近的一个消息,然后消息不能执行的话就阻塞,否则就返回。

最后还有一个IdleHandler,它是空闲消息,当消息队列处于空闲状态时才会执行。空闲状态是指:消息队列中没有消息、消息队列中有消息但是未到执行时间、消息队列中有消息但是被消息屏障给屏蔽了并且没有可执行的异步消息。注意这里的next方法中取消息的逻辑是一个死循环:先取消息,没有可执行的消息时会计算需要阻塞的时长,然后再去执行IdleHandler消息;然后再到循环时,会再次找消息并计算阻塞时长,然后受pendingIdleHandlerCount参数的影响这次不会再执行IdleHandler了;然后再次循环进去阻塞状态。之所以这样循环,是因为IdleHandler也是执行在Looper线程的,考虑到它的执行可能会消耗时间,因此需要在它执行之后重新计算阻塞时长。

接下来再看看Handler是如何执行消息的,即msg.target,.dispatchMessage

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public void dispatchMessage(@NonNull Message msg) {
// 如果callback不为空,说明消息是个可执行代码块,直接执行即可
if (msg.callback != null) {
handleCallback(msg);
} else {
// 可以给Handler设置一个处理消息的callback,如果执行消息的话,就不会再去分发了
if (mCallback != null) {
if (mCallback.handleMessage(msg)) {
return;
}
}
// 最后的执行消息的地方,是个空方法
handleMessage(msg);
}
}

private static void handleCallback(Message message) {
message.callback.run();
}

普通消息会被分为数据消息和代码块消息,代码块的消息会被直接执行掉,从而实现了跨线程,因为代码块是在其他线程发送的,但是执行时却是在Looper的线程执行的。而数据消息则是由用户自己去处理,使用示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// 使用方式一,构造Handler时传入Callback进行处理消息
private Handler mHandler = new Handler(Looper.getMainLooper(), new Handler.Callback() {
@Override
public boolean handleMessage(@NonNull Message msg) {
boolean consumed = false;
switch (msg.what) {
case 1:
consumed = true;
break;
default:
break;
}
return consumed;
}
});

// 使用方式二,使用匿名内部类,重写handlerMessage方法来处理消息
private Handler mHandler2 = new Handler(Looper.getMainLooper()) {
@Override
public void handleMessage(@NonNull Message msg) {
switch (msg.what) {
case 1:
consumed = true;
break;
default:
break;
}
}
};

当然,如果不发送数据消息的话,直接使用new Handler(Looper)即可。

小结

在Handler消息机制中,绑定了LooperHandler可以在任意的线程中向MessageQueue中发送消息,Looper负责循环驱动来读取消息,并在自己的线程内处理消息。其中消息分为三类:

  • 普通消息:普通消息又根据msg.isAsynchronous分为异步消息和同步消息,他们之间的差异就是异步消息不会被消息屏障给屏蔽掉。不管是同步消息还是异步消息,他们又根据msg.callback分为代码块消息和数据消息,如果callback不为空说明是代码块消息,会直接执行。数据消息则是由用户自己进行处理,可根据what来区分消息。
  • 消息屏障:msg.target为空就代表它是一个消息屏障,它的作用就是屏蔽它后面的消息。一般用来保证重要消息的执行,如View在绘制时就会发送一个消息屏障屏蔽掉其他消息,以保障绘制的顺利完成。
  • 空闲消息:IdleHandler并不是一个Message,他在MessageQueue中也是单独用一个集合存储的。它只会在消息队列空闲的时候执行,时机无法控制,因此适合处理一些不重要的东西。

Native层源码

前面说到的是Java层的整个机制的源码,可以看到在阻塞和唤醒的地方都是调用的native的方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
MessageQueue(boolean quitAllowed) {
mQuitAllowed = quitAllowed;
mPtr = nativeInit();
}

Message next() {
...
for (;;) {
nativePollOnce(ptr, nextPollTimeoutMillis);
...
}
}

boolean enqueueMessage(Message msg, long when) {
...
synchronized (this) {
...
if (needWake) {
nativeWake(mPtr);
}
}
return true;
}


private native static long nativeInit();
private native static void nativeDestroy(long ptr);
@UnsupportedAppUsage
private native void nativePollOnce(long ptr, int timeoutMillis); /*non-static for callbacks*/
private native static void nativeWake(long ptr);

可以看到,这一系列的方法都是native方法,源码实现都是在JNI中,并且他们不是通过System.load加载的,而是在native层动态加载的,基本上所有的系统JNI都是在AndroidRuntimeframeworks/base/core/jni/AndroidRuntime.cpp)中动态加载的。我们正常找对应的JNI文件的时候,可以直接全局搜文件名,文件名的规则就是包名+类名,如MessageQueue对应的JNI文件就是android_os_MessageQueue.cppframeworks/base/core/jni/android_os_MessageQueue.cpp)。

找到了对应的文件,那么我们直接看nativeInit方法,它是在构造MessageQueue的时候调用的,返回值是一个long类型的值。

1
2
3
4
5
6
7
8
9
10
11
static jlong android_os_MessageQueue_nativeInit(JNIEnv* env, jclass clazz) {
NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue();
if (!nativeMessageQueue) {
jniThrowRuntimeException(env, "Unable to allocate native queue");
return 0;
}
// 增加强引用,避免被销毁
nativeMessageQueue->incStrong(env);
// 返回地址指针
return reinterpret_cast<jlong>(nativeMessageQueue);
}

即创建了一个NativeMessageQueue,然后增强了它的强引用,避免被销毁,然后返回了地址指针给到Java层。这里的NativeMessageQueue实际并不是个消息队列,它只是名字叫做这个耳机,内部也没什么结构逻辑,只是持有一个sp<Looper>对象。注意这里的Looper并不是Java层的,而是C++Looper

1
2
3
4
5
6
7
8
NativeMessageQueue::NativeMessageQueue() :
mPollEnv(NULL), mPollObj(NULL), mExceptionObj(NULL) {
mLooper = Looper::getForThread();
if (mLooper == NULL) {
mLooper = new Looper(false);
Looper::setForThread(mLooper);
}
}

看得出来,LooperJava层的Looper是一样的,一个线程中只会存在一个。这里的逻辑就是先去根据当前线程获取Looper,获取不到的话就去创建一个Looper,然后保存起来。Looper的位置在system/core/libutils/Looper.cppsystem/core/include/utils/Looper.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
Looper::Looper(bool allowNonCallbacks)
: mAllowNonCallbacks(allowNonCallbacks),
mSendingMessage(false),
mPolling(false),
mEpollRebuildRequired(false),
mNextRequestSeq(WAKE_EVENT_FD_SEQ + 1),
mResponseIndex(0),
mNextMessageUptime(LLONG_MAX) {
// 创建eventfd,用于监听该eventfd来实现阻塞和唤醒
mWakeEventFd.reset(eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC));
LOG_ALWAYS_FATAL_IF(mWakeEventFd.get() < 0, "Could not make wake event fd: %s", strerror(errno));
AutoMutex _l(mLock);
// 创建epoll,通过epoll来监听eventfd
rebuildEpollLocked();
}


void Looper::rebuildEpollLocked() {
// 重置旧的epoll
if (mEpollFd >= 0) {
mEpollFd.reset();
}

// 创建一个新的epoll
mEpollFd.reset(epoll_create1(EPOLL_CLOEXEC));
epoll_event wakeEvent = createEpollEvent(EPOLLIN, WAKE_EVENT_FD_SEQ);
// 添加一个eventfd,并设置唤醒状态为eventfd中被写入数据时,即eventfd可读时唤醒
int result = epoll_ctl(mEpollFd.get(), EPOLL_CTL_ADD, mWakeEventFd.get(), &wakeEvent);

// 监听其他的eventfd,这里与是其他流程,可以暂时不去关注
for (const auto& [seq, request] : mRequests) {
epoll_event eventItem = createEpollEvent(request.getEpollEvents(), seq);
int epollResult = epoll_ctl(mEpollFd.get(), EPOLL_CTL_ADD, request.fd, &eventItem);
}
}

看到这里应该就很熟悉了,前面在Handler唤起的基础中详细介绍过eventfdepoll机制,其中epoll是可以监控多个eventfd的,每个fd都能唤醒epoll,这里默认的唤醒的fd是mWakeEventfd。所以初始化的方法也就简洁明了了,创建了一个Looper,然后在Looper中整了eventfdepoll
然后看阻塞的方法,应该会和我们想的一样,通过epoll_wait等待eventfd的状态变化:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj,
jlong ptr, jint timeoutMillis) {
// 根据java传进来的指针获取到MQ
NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
nativeMessageQueue->pollOnce(env, obj, timeoutMillis);
}

void NativeMessageQueue::pollOnce(JNIEnv* env, jobject pollObj, int timeoutMillis) {
mPollEnv = env;
mPollObj = pollObj;
mLooper->pollOnce(timeoutMillis);
mPollObj = NULL;
mPollEnv = NULL;

if (mExceptionObj) {
env->Throw(mExceptionObj);
env->DeleteLocalRef(mExceptionObj);
mExceptionObj = NULL;
}
}

首先就是在nativeInit的时候构建了NativeMessageQueue,然后将指针传入到了Java层保存在Java层的MessageQueue.mPtr中。后续所有的操作都是与mPtr相关的,从Java再到native,将mPtr指针地址再转换成NativeMessageQueue进行操作。如nativePollOnce就是最终走到Looper.pollOnce

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
// Looper.h
inline int pollOnce(int timeoutMillis) {
return pollOnce(timeoutMillis, nullptr, nullptr, nullptr);
}

// Looper.cpp
int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
int result = 0;
for (;;) {
...
// 这段逻辑是处理消息的,但是我们传入的后面三个参数都是nullptr,
// 因此不会走到处理消息的逻辑。
...
if (result != 0) {
...
// 退出循环,即此时已经唤醒了
return result;
}
// 进入阻塞,阻塞结束后会再次循环,然后在上面退出循环
result = pollInner(timeoutMillis);
}
}

int Looper::pollInner(int timeoutMillis) {
// Poll.
int result = POLL_WAKE;
mResponses.clear();
mResponseIndex = 0;

// 进入阻塞,阻塞结束后返回值就是触发唤醒的个数,唤醒事件会存在eventItems中
mPolling = true;
struct epoll_event eventItems[EPOLL_MAX_EVENTS];
int eventCount = epoll_wait(mEpollFd.get(), eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
mPolling = false;

// 发生错误,直接走到Done逻辑块
if (eventCount < 0) {
if (errno == EINTR) {
goto Done;
}
result = POLL_ERROR;
goto Done;
}

// 唤醒的消息数为0,表示是超时引起的唤醒
if (eventCount == 0) {
result = POLL_TIMEOUT;
goto Done;
}

// 唤醒的消息数不为0,开始处理消息。因为我们是从Java层的MessageQueue中使用的,
// 并没有额外传入eventfd,因此这里即使被唤醒eventCount也只会是1
for (int i = 0; i < eventCount; i++) {
const SequenceNumber seq = eventItems[i].data.u64;
uint32_t epollEvents = eventItems[i].events;
// 添加到epoll时,mWakeEventFd的类型就是WAKE_EVENT_FD_SEQ,因此这里说明epoll被唤醒了
// 并且唤醒的是mWakeEventFd
if (seq == WAKE_EVENT_FD_SEQ) {
if (epollEvents & EPOLLIN) {
awoken();
} else {
ALOGW("Ignoring unexpected epoll events 0x%x on wake event fd.", epollEvents);
}
} else {
// 其他fd唤醒的会构建response,然后存入到mResponse中被looperOnce中的逻辑处理
const auto& request_it = mRequests.find(seq);
if (request_it != mRequests.end()) {
const auto& request = request_it->second;
int events = 0;
if (epollEvents & EPOLLIN) events |= EVENT_INPUT;
if (epollEvents & EPOLLOUT) events |= EVENT_OUTPUT;
if (epollEvents & EPOLLERR) events |= EVENT_ERROR;
if (epollEvents & EPOLLHUP) events |= EVENT_HANGUP;
mResponses.push({.seq = seq, .events = events, .request = request});
} else {
...
}
}
}

Done: ;

// Invoke pending message callbacks.
mNextMessageUptime = LLONG_MAX;
...

// 处理其他事件的response
for (size_t i = 0; i < mResponses.size(); i++) {
Response& response = mResponses.editItemAt(i);
if (response.request.ident == POLL_CALLBACK) {
...
int callbackResult = response.request.callback->handleEvent(fd, events, data);
...
response.request.callback.clear();
result = POLL_CALLBACK;
}
}
return result;
}

所以确实是epoll机制,最终走到的就是epoll_wait进入阻塞,等待被唤醒或者超时被唤醒。并且在这里我们看到了别的逻辑,就是Looper不仅仅是给Java层使用的,它除了默认用于唤醒的mWakeEventfd外,还支持添加别的eventfd,并且别的eventfd唤醒epoll后会构建Response来处理这些消息。事实上就是如此,著名的ServiceManager就使用了Looper,然后将Binderfd添加到Looper中来监听Binder的消息,这是额外的话题,以后再说。
阻塞的流程我们看完了,接下来看唤醒的流程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// android_os_MessageQueue.cpp
static void android_os_MessageQueue_nativeWake(JNIEnv* env, jclass clazz, jlong ptr) {
NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
nativeMessageQueue->wake();
}

void NativeMessageQueue::wake() {
mLooper->wake();
}

//Looper.cpp
void Looper::wake() {
uint64_t inc = 1;
// 向mWakeEventfd中写入一个1,然后epoll_wait就会被唤醒了
ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd.get(), &inc, sizeof(uint64_t)));
if (nWrite != sizeof(uint64_t)) {
if (errno != EAGAIN) {
LOG_ALWAYS_FATAL("Could not write wake signal to fd %d (returned %zd): %s",
mWakeEventFd.get(), nWrite, strerror(errno));
}
}
}

唤醒流程非常清晰,根据Java层传来的地址找到NativeMessageQueue,最终走到Looper.wake方法来进行唤醒,唤醒的方式也是最简单的向eventfd中写入一个1。

小结

Native层也有一个Looper,他与Java层差不多,也是线程唯一的。当在Java层创建MessageQueue时,也会同时在Native层创建一个NativeMessageQueue并赋值给Java层的mPtr属性,由此将二者进行绑定。然后在NativeMessageQueue创建时,还会创建native层的Looper,后续的阻塞和唤醒都是在native层的Looper中通过eventfdepoll机制进行的。

img/handler.webp

通过前面对Java层的Handler消息机制的分析,我们知道了整体的逻辑:

  1. 初始化:首先在线程中启用JavaLooper,此时会创建JavaMessageQueue用于存储消息,同时会通过nativeInit进入到Native层创建了NativeMessageQueueNativeLooper
  2. 读取:Java层的Looper会进入循环,一直读取MessageQueue中的消息。读不到的时候会通过nativePollOnce进入到native层,然后在Native层的Looper中通过epoll机制进入阻塞状态。
  3. 写入:Java层的Handler发送消息到MessageQueue中,通过nativeWake进入到native层,然后在NativeLooper中向mWakeEventfd写入一个值,用于唤醒epoll的阻塞。唤醒后回到Java层的Looper开始处理消息,然后继续阻塞。

这里我们看到实际的阻塞和唤醒都是在native层实现的,并且Native层还有一个Looper。注意这里的NativeMessageQueue实际是没啥用的,它存在的作用就是连接Java层和Native层,它将native Looper等信息存储在自己的对象中,然后将对象的地址保存在Java中。后续的交互就是Java层根据地址找到NativeMessageQueue进而找到native Looper

从前面的分析我们知道了Looper进入阻塞的时候,是通过epoll机制阻塞的,而熟悉epoll机制(点击查看epoll和eventfd)的我们知道,epoll可以监听多个eventfd,也就是说我们可以向Looper中注册自己的eventfd实现阻塞和唤醒。

前面我们看到在Looper中,会给我们提供一个默认的mWakeEventfd用来唤醒和阻塞,并且这个fd仅仅是用来唤醒和阻塞的。但是我们看到Looper的逻辑中是可以添加自己的eventfd的,然后在被自己的eventfd唤醒的时候还会自动执行操作,类似于一个消息机制了,接下来看看这个是怎么完成的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
Looper::addFd(int fd, int ident, int events, Looper_callbackFunc callback, void* data) {
sp<SimpleLooperCallback> looperCallback;
if (callback) {
looperCallback = sp<SimpleLooperCallback>::make(callback);
}
return addFd(fd, ident, events, looperCallback, data);
}

int Looper::addFd(int fd, int ident, int events, const sp<LooperCallback>& callback, void* data) {
if (!callback.get()) {
...
} else {
ident = POLL_CALLBACK;
}

{ // acquire lock
AutoMutex _l(mLock);
// mWakeEventfd的seq是WAKE_EVENT_FD_SEQ,后续添加的fd都往后排
if (mNextRequestSeq == WAKE_EVENT_FD_SEQ) mNextRequestSeq++;
const SequenceNumber seq = mNextRequestSeq++;

Request request;
// 唤醒后,根据fd来寻找对应的request处理事件
request.fd = fd;
request.ident = ident;
request.events = events;
// 唤醒后由callback处理事件
request.callback = callback;
request.data = data;

epoll_event eventItem = createEpollEvent(request.getEpollEvents(), seq);
auto seq_it = mSequenceNumberByFd.find(fd);
if (seq_it == mSequenceNumberByFd.end()) {
// 添加新的eventfd
int epollResult = epoll_ctl(mEpollFd.get(), EPOLL_CTL_ADD, fd, &eventItem);
mRequests.emplace(seq, request);
mSequenceNumberByFd.emplace(fd, seq);
} else {
// eventfd已经添加过了,则修改fd
int epollResult = epoll_ctl(mEpollFd.get(), EPOLL_CTL_MOD, fd, &eventItem);
...
const SequenceNumber oldSeq = seq_it->second;
mRequests.erase(oldSeq);
mRequests.emplace(seq, request);
seq_it->second = seq;
}
} // release lock
return 1;
}

在添加fd的时候,要求传入一个LooperCallback,它是用来处理消息的,然后将这些参数封装成Request对象,然后存入到mRequest集合中,并且向epoll中添加对应的eventfd。正常来说,添加完之后就可以开始监听了,有两个监听的方法:ponnOncepollAll,其中pollOnce就是阻塞一次,唤醒后会处理消息,然后就返回了。而pollAll则是循环调用pollOnce,只要pollOnce的返回值是CALLBACK,就继续进入阻塞等待事件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
int Looper::pollAll(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
if (timeoutMillis <= 0) {
int result;
do {
// 只要返回值是POLL_CALLBACK,就一直循环
result = pollOnce(timeoutMillis, outFd, outEvents, outData);
} while (result == POLL_CALLBACK);
return result;
} else {
nsecs_t endTime = systemTime(SYSTEM_TIME_MONOTONIC)
+ milliseconds_to_nanoseconds(timeoutMillis);

for (;;) {
// 同样的逻辑,区别就是设置了超时时间后,会在超时时间到达时返回
int result = pollOnce(timeoutMillis, outFd, outEvents, outData);
if (result != POLL_CALLBACK) {
return result;
}
// 此次唤醒不是超时唤醒,但是在处理完时间后时间到了,也不再阻塞,而是直接返回
nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
timeoutMillis = toMillisecondTimeoutDelay(now, endTime);
if (timeoutMillis == 0) {
return POLL_TIMEOUT;
}
}
}
}

其中pollOnce我们前面已经看过了,就是进入阻塞,然后唤醒后构建Response,然后进行处理事件。下面再看一次:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
int result = 0;
for (;;) {
...
if (result != 0) {
...
return result;
}
result = pollInner(timeoutMillis);
}
}

int Looper::pollInner(int timeoutMillis) {
// 准备进入阻塞,先把response清空,后续用于存储唤醒的事件
int result = POLL_WAKE;
mResponses.clear();
mResponseIndex = 0;

// 开始阻塞,当被唤醒后,唤醒的事件会存在epoll_event数组中
mPolling = true;
struct epoll_event eventItems[EPOLL_MAX_EVENTS];
int eventCount = epoll_wait(mEpollFd.get(), eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
mPolling = false;

...
// 遍历唤醒的事件,然后将其封装成Response
for (int i = 0; i < eventCount; i++) {
const SequenceNumber seq = eventItems[i].data.u64;
uint32_t epollEvents = eventItems[i].events;
// 默认的mWakeEventfd的seq是这个,不会封装Response,也不会处理任何事件
if (seq == WAKE_EVENT_FD_SEQ) {
if (epollEvents & EPOLLIN) {
awoken();
}
} else {
// 根据seq找到request,然后构建Response的时候会将二者进行关联
const auto& request_it = mRequests.find(seq);
if (request_it != mRequests.end()) {
const auto& request = request_it->second;
int events = 0;
if (epollEvents & EPOLLIN) events |= EVENT_INPUT;
if (epollEvents & EPOLLOUT) events |= EVENT_OUTPUT;
if (epollEvents & EPOLLERR) events |= EVENT_ERROR;
if (epollEvents & EPOLLHUP) events |= EVENT_HANGUP;
// 封装成Response并添加到mResponse集合中
mResponses.push({.seq = seq, .events = events, .request = request});
}
}
}
Done: ;
...
for (size_t i = 0; i < mResponses.size(); i++) {
Response& response = mResponses.editItemAt(i);
if (response.request.ident == POLL_CALLBACK) {
int fd = response.request.fd;
int events = response.events;
void* data = response.request.data;
// 直接执行callback,其中fd和data都是添加fd时的参数没有修改,而events表示的是唤醒的事件
int callbackResult = response.request.callback->handleEvent(fd, events, data);
..
result = POLL_CALLBACK;
}
}
return result;
}

所以我们在c++层中,就可以通过添加fd以及对应的callback,当fd发生变化的时候就会唤醒epoll,然后执行callback了。实际上LooperNative层用的更多,像我们熟悉的ServiceManager也使用了Looper机制。