多线程是Android开发中常见的场景,许多原生API和开源项目都涉及到多线程的内容。本文将简单总结和探讨一下常见的多线程切换方式。首先,我们回顾一下Java多线程的几个基础内容,然后再分析总结一些经典代码中对于线程切换的实现方式。

几点基础:

1. 多线程切换主要包括如何开启多个线程、如何定义每个线程的任务以及如何在线程之间互相通信。

2. Thread类可以解决开启多个线程的问题。Thread是Java中实现多线程的线程类,每个Thread对象都可以启动一个新的线程。可以通过以下两种方式启动新线程:

- thread.run(); // 不启动新线程,在当前线程执行

- thread.start(); // 启动新线程。

3. Thread存在线程优先级问题。如果为Thread设置较高的线程优先级,就有机会获得更多的CPU资源。注意这里只是有机会,优先级高的Thread不是必然会先于其他Thread执行,只是系统会倾向于给它分配更多的CPU。默认情况下,新建的Thread和当前Thread的线程优先级一致。设置线程优先级有两种方式:

- thread.setPriority(Thread.MAX_PRIORITY); // 1~10,通过线程设置

- Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND); // -20~19,通过进程设置

4. 在Android开发中,一般建议通过Process进程设置优先级。

5. Thread本身是需要占用内存的,开启/销毁过量的工作线程会造成过量的资源损耗。针对这种情况,我们一般会通过对资源的复用来进行优化。针对IO资源,我们会做IO复用(例如Http的KeepAlive);针对内存,我们会做内存池复用(例如Fresco的内存池);针对CPU资源,我们一般会做线程复用,也就是线程池。因此,在Android开发中,一般不会直接开启大量的Thread,而是会使用ThreadPool来复用线程。

6. Runnable主要解决如何定义每个线程的工作任务的问题。Runnable是Java中实现多线程的接口,相对Thread而言,Runnable接口更容易扩展(不需要单继承),而且,Thread本身也是一种Runnable。

在Java中,实现多线程编程有多种方式。相比Thread而言,Runnable不关注如何调度线程,只关心如何定义要执行的工作任务,所以在实际开发中,多使用Runnable接口完成多线程开发。此外,还有Callable和Future等接口可以用于多线程编程。

1. Callable

Callable和Runnable基本类似,但是Callable可以返回执行结果。这意味着我们可以在Callable的call方法中执行一些耗时的操作,并将结果返回给调用者。这样,我们就可以在主线程中获取子线程的执行结果,而不需要使用回调函数或者Future。

2. 线程间通信

虽然Thread和Runnable能实现切换到另一个线程工作(Runnable需要额外指派工作线程),但它们完成任务后就会退出,并不注重如何在线程间实现通信。因此,在切换线程时,还需要在线程间通信,这就需要一些线程间通信机制。常见的线程间通信机制有:wait()、notify()、notifyAll()、join()、sleep()、yield()等方法。

3. Future

一般来说,如果要做简单的通信,我们最常用的是通过接口回调来实现。Future就是这样一种接口,它可以部分地解决线程通信的问题。Future接口定义了done、canceled等回调函数,当工作线程的任务完成时,它会(在工作线程中)通过回调告知我们,我们再采用其他手段通知其他线程。例如:

```java

Future mFuture = new FutureTask(runnable) { @Override protected void done() { ...//还是在工作线程里 } };

```

4. Condition

Condition其实是和Lock一起使用的,但如果把它视为一种线程间通信的工具,也说的通。因为,Condition本身定位就是一种多线程间协调通信的工具,Condition可以在某些条件下,唤醒等待线程。例如:

```java

Lock lock = new ReentrantLock();

Condition notFull = lock.newCondition(); //定义Lock的Condition

while (count == items.length)

notFull.await();//等待condition的状态

notFull.signal();//达到condition的状态

```

5. Handler

Handler主要用于Android平台上的异步消息处理。在Android中,可以使用Handler来实现子线程与主线程之间的通信。例如:

```java

Handler mHandler = new Handler(Looper.getMainLooper()) {

@Override

public void handleMessage(Message msg) {

//处理消息

} };

AsyncTask的多线程切换

在回顾了多线程的基本概念之后,我们来看一下简单的多线程切换——Android自带的AsyncTask。AsyncTask主要在doInBackground函数中定义工作线程的工作内容,在其他函数中定义主线程的工作内容,例如onPostExecute。这里涉及到两个问题:1. 如何实现把doInBackground抛给工作线程;2. 如何实现把onPostExecute抛给主线程。实际上,解决这两个问题非常简单。

首先,我们来看如何实现把doInBackground抛给工作线程。在使用AsyncTask时,我们通常会创建一个基于AsyncTask的扩展类或匿名类,在其中实现几个抽象函数,例如:

```java

private class MyTask extends AsyncTask {

@Override

protected void onPreExecute() {... }

@Override

protected Long doInBackground(String... params) {... }

@Override

protected void onProgressUpdate(Object... values) {... }

@Override

protected void onPostExecute(Long aLong) {... }

@Override

protected void onCancelled() {... }

}

```

接下来,我们实例化这个AsyncTask:

```java

MyTask mTask = new MyTask();

```

在AsyncTask源码中,我们可以看到,构造函数里会创建一个WorkerRunnable对象:

```java

protected void execute(Params... params) {

mWorker.mParams = params;

mWorker.mCurrPage = 0;

mWorker.mLastPage = Integer.MAX_VALUE;

mWorker.mOutputList = new ArrayList<>();

mWorker.mTaskListener = this;

mExecuting = true;

mWorkerThread = new Thread(mWorker);

mWorkerThread.start();

}

```

在这个过程中,我们可以看到,AsyncTask通过创建一个WorkerRunnable对象并将其传递给新创建的线程来实现将doInBackground抛给工作线程。这样,工作线程就可以在doInBackground函数中执行耗时操作,而主线程则可以在onPostExecute或其他函数中处理结果。

public AsyncTask() { mWorker = new WorkerRunnable() { //这是一个Callable

public Result call() throws Exception {

...

result = doInBackground(mParams); //在工作线程中执行

...

}

};

}

实际上,WorkerRunnable是一个Callable对象。因此,doInBackground被包装在一个Callable对象中,这个Callable还会继续被包装,最终被交给一个线程池去执行:

```java

Runnable mActive;

...

if ((mActive = mTasks.poll()) != null) {

THREAD_POOL_EXECUTOR.execute(mActive); //交给线程池执行

}

```

梳理一下,大致过程为:定义doInBackground -> 被一个Callable调用 -> 层层包为一个Runnable -> 交给线程池执行。这样就解决了第一个问题,如何实现把doInBackground抛给工作线程。我们再来看第二个问题。

2. 如何实现把onPostExecute抛给主线程?

首先,我们需要知道工作任务何时执行完毕,这就需要在工作完成时触发一个接口回调,也就是前面提到的Future。还是看AsyncTask源码:

以下是重构后的代码:

public AsyncTask() {

mWorker = new WorkerRunnable() {

@Override

public Result doInBackground(Params... params) {

try {

return onPreExecute();

} catch (Exception e) {

e.printStackTrace();

return null;

}

}

@Override

protected void onPostExecute(Result result) {

//处理onPostExecute函数,如果有需要可以在这里更新UI等操作

if (result != null) {

postResultIfNotInvoked(result); //postResultIfNotInvoked是自定义的方法,用于将结果传递出去

} else {

throw new RuntimeException("执行任务时发生异常!");

}

}

};

mFuture = new FutureTask<>(mWorker) {

@Override

protected void done() {//Future的回调

try {

get(); //get()是FutureTask接口函数

} catch (Exception e) {

e.printStackTrace();

} finally {

//无论是否成功都要调用onCancelled来取消注册

onCancelled();

}

}

};

}

```java

private Result postResult(Result result) {

@SuppressWarnings("unchecked")

Message message = getHandler().obtainMessage(MESSAGE_POST_RESULT,

new AsyncTaskResult(this, result));

message.sendToTarget();

return result;

}

// getHandler()会指向InternalHandler

private static class InternalHandler extends Handler {

public InternalHandler() {

super(Looper.getMainLooper()); //指向MainLooper

}

@SuppressWarnings({"unchecked", "RawUseOfParameterizedType"})

@Override

public void handleMessage(Message msg) {

AsyncTaskResult result = (AsyncTaskResult) msg.obj;

switch (msg.what) {

case MESSAGE_POST_RESULT:

// There is only one result

result.mTask.finish(result.mData[0]); //通过handler机制,回到主线程,调用finish函数

...

}

// 在Handler中,最终会在主线程中调用finish

private void finish(Result result) {

if (isCancelled()) {

onCancelled(result);

} else {

onPostExecute(result); //调用onPostExecute接口函数

}

mStatus = Status.FINISHED;

}

}

```

从源码来看,AsyncTask实际上是通过Handler机制将任务抛给主线程执行的。总体来说,AsyncTask的多线程任务是通过线程池实现的工作线程来完成的。在任务完成后,利用Future的done回调通知任务已完成,并通过handler机制通知主线程执行onPostExecute等回调函数。

EventBus的多线程切换:

EventBus会为每个订阅事件注册一个目标线程。因此,需要从发布事件的线程中实时切换到目标线程。这是一个典型的多线程切换场景。根据EventBus的源码,多线程切换的主要判断代码如下:

请根据以下内容完成重构,并保持段落结构:

```java

switch (subscription.subscriberMethod.threadMode) {

case POSTING:

invokeSubscriber(subscription, event);//直接在当前线程执行

break;

case MAIN:

if (isMainThread) {

invokeSubscriber(subscription, event);//在当前主线程执行

} else {

mainThreadPoster.enqueue(subscription, event);//当然不是主线程,交给主线程执行

}

break;

case BACKGROUND:

if (isMainThread) {

backgroundPoster.enqueue(subscription, event);//当前线程为主线程,交给工作线程

} else {

invokeSubscriber(subscription, event);//直接在当前工作线程执行

}

break;

case ASYNC:

asyncPoster.enqueue(subscription, event);//异步执行

break;

default:

throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);

}

```

EventBus 是一个用于实现 Android 应用程序中不同组件之间进行解耦和通信的工具。在 EventBus 中,如果需要在线程间切换,主要是通过抛给不同的任务队列来实现线程间切换。任务队列的设计可以有以下三种切换目标:主线程 Poster、backgroundPoster 和 asyncPoster。

EventBus 不能直接判断哪些任务会并行执行,因此采用了队列的设计。多线程任务(EventBus 的事件)会先进入队列,然后再处理队列中的工作任务,这是典型的生产-消费场景。主线程 Poster、backgroundPoster 和 asyncPoster 都是任务队列的不同实现。

1. 主线程 Poster:

负责处理主线程的 mainThreadPoster 是 Handler 的子类,其源码如下:

```java

final class HandlerPoster extends Handler { ...

void enqueue(Subscription subscription, Object event) {

...

synchronized (this) {

// 因为主线程只有一个,需要线程安全

queue.enqueue(pendingPost);

...

if (!sendMessage(obtainMessage())) {

// 作为 handler 发送消息

...

}

}

}

// 在主线程中处理消息

@Override

public void handleMessage(Message msg) {

...

}

}

```

从源码可以看出,这个 Poster 其实是一个 Handler,它采用了哪个线程的消息队列,就决定了它能和哪个线程通信。例如:

```java

EventBus(EventBusBuilder builder) { ...

mainThreadPoster = new HandlerPoster(this, Looper.getMainLooper(), 10); // 获取主线程的 MainLooper

}

```

2. backgroundPoster:

负责处理后台线程的 backgroundPoster 是类似于上述主线程 Poster 的实现,其源码与主线程 Poster 类似。主要区别在于它是在子线程中处理消息。例如:

```java

private static final class BackgroundPoster extends Handler { ...

}

```

3. asyncPoster:

负责处理异步任务的 asyncPoster 也是类似于上述主线程 Poster 的实现,其源码与主线程 Poster 类似。主要区别在于它是在单独的异步线程中处理消息。例如:

```java

private static final class AsyncPoster extends Handler { ...

}

```

EventBus是一个扩展了Handler的类,它通过Handler的消息机制实现了多线程切换。在这个过程中,Handler本身还增加了一个队列层。

EventBus使用了两个线程池:backgroundPoster和asyncPoster。这两个线程池都使用了默认的缓存线程池:DEFAULT_EXECUTOR_SERVICE,即Executors.newCachedThreadPool()。这意味着它们都将任务交给线程池处理,从而实现多线程切换。

尽管backgroundPoster和asyncPoster在功能上相似,但它们之间还是存在一些差异。首先,在newCachedThreadPool中,最大线程数就是Integer的最大值,相当于不设上限,因此可以尽可能多地启动线程。这就是为什么asyncPoster在enqueue和run方法中都没有进行同步操作,而是为每个事件单独开启新线程处理的原因。

而在backgroundPoster中,为了尽量复用线程,主要方法是在run方法中加入一个1秒的等待时间:

```java

@Override public void run() { ... PendingPost pendingPost = queue.poll(1000);//允许等待1秒

```

由于进行了这一秒的挂起等待,所以在enqueue和run方法中都需要使用synchronized关键字来确保线程安全。

此外,还有一个重要的用法是Executors.newCachedThreadPool()中的SynchronousQueue。

SynchronousQueue是一个阻塞队列,它不是数据等线程,而是线程等数据。这样每次向SynchronousQueue里传入数据时,都会立即交给一个线程执行,这样可以提高数据得到处理的速度。EventBus采用线程池实现工作线程,采用handler机制通知到主线程。不同在于,它采用的queue的队列方式来管理所有的跨线程请求,而且它利用了SynchronousQueue阻塞队列来辅助实现线程切换。RxJava的多线程切换能力非常令人赞叹。RxJava的主要概念是工作流,它可以把一序列工作流定义在一个线程类型上 。

构建工作流的过程相当复杂,但如果我们只关注线程操作部分,流程会变得非常清晰。让我们跟踪一下subscribeOn的源码(RxJava2):

```java

public final Flowable subscribeOn(@NonNull Scheduler scheduler) {

ObjectHelper.requireNonNull(scheduler, "scheduler is null");

return subscribeOn(scheduler, !(this instanceof FlowableCreate));

}

public final Flowable subscribeOn(@NonNull Scheduler scheduler, boolean requestOn) {

ObjectHelper.requireNonNull(scheduler, "scheduler is null");

return RxJavaPlugins.onAssembly(new FlowableSubscribeOn(this, scheduler, requestOn));

}

```

接下来,我们进入FlowableSubscribeOn类:

在这个类中,我们可以看到subscribeOn方法接受一个Scheduler参数和一个布尔值requestOn。Scheduler参数用于指定执行任务的线程调度器,而requestOn参数则表示是否在订阅时立即执行任务。

首先,我们使用ObjectHelper.requireNonNull方法检查传入的scheduler参数是否为null,如果为null则抛出异常。然后,根据传入的scheduler和requestOn参数创建一个新的FlowableSubscribeOn对象,并调用RxJavaPlugins.onAssembly方法进行组装。最后,返回组装好的Flowable对象。

```java

// 进入FlowableSubscribeOn类

public FlowableSubscribeOn(Flowable source, Scheduler scheduler, boolean nonScheduledRequests) {

...

this.scheduler = scheduler;

...

}

@Override

public void subscribeActual(final Subscriber s) {

Scheduler.Worker w = scheduler.createWorker(); // 根据参数值,如Schedulers.io()创建worker

final SubscribeOnSubscriber sos = new SubscribeOnSubscriber<>(s, w, source, nonScheduledRequests); // 根据worker创建SubscribeOnSubscriber

s.onSubscribe(sos);

w.schedule(sos);

}

```

其中,SubscribeOnSubscriber是一个内部类。

SubscribeOnSubscriber 类的构造方法接收一个 Subscriber 类型的参数 actual,一个 Scheduler.Worker 类型的参数 worker,一个 Publisher 类型的参数 source,以及一个布尔类型的参数 requestOn。

在构造方法中,首先对传入的实际订阅者进行类型检查,确保它是 Subscriber 类型。然后将传入的工作线程赋值给 this.worker。最后调用 super.onSubscribe(actual) 方法,通知实际订阅者已经准备好接收数据。

在 requestUpstream 方法中,首先创建一个 Request 对象,将传入的订阅信息和请求的数量封装在其中。然后调用 worker.schedule() 方法,将 Request 对象提交到工作线程中执行。这里的 worker.schedule() 方法会根据传入的 Request 对象来决定如何执行任务(Request 是一个实现了 Runnable 接口的对象)。

```java

// ioreactivex.schedulers.Schedulers源码

public class SchedulersUtils {

static {

SINGLE = RxJavaPlugins.initSingleScheduler(new SingleTask());

COMPUTATION = RxJavaPlugins.initComputationScheduler(new ComputationTask());

IO = RxJavaPlugins.initIoScheduler(new IOTask());

TRAMPOLINE = TrampolineScheduler.instance();

NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());

}

// ...省略其他代码...

interface Callable {

T call() throws Exception;

}

static final class IOTask implements Callable {

@Override

public Scheduler call() throws Exception {

return IoHolder.DEFAULT;

}

}

static final class NewThreadTask implements Callable {

@Override

public Scheduler call() throws Exception {

return NewThreadHolder.DEFAULT;

}

}

static final class SingleTask implements Callable {

@Override

public Scheduler call() throws Exception {

return SingleHolder.DEFAULT;

}

}

static final class ComputationTask implements Callable {

@Override

public Scheduler call() throws Exception {

return ComputationHolder.DEFAULT;

}

}

static final class SingleHolder {

static final Scheduler DEFAULT = new SingleScheduler();

}

static final class ComputationHolder {

static final Scheduler DEFAULT = new ComputationScheduler();

}

static final class IoHolder {

static final Scheduler DEFAULT = new IoScheduler();

}

static final class NewThreadHolder {

static final Scheduler DEFAULT = new NewThreadScheduler();

}

}

```

以下是重构后的代码,保持了原有的段落结构:

```java

// io.reactivex.internal.schedulers.IoScheduler源码

static final class EventLoopWorker extends Scheduler.Worker {

// Scheduler.Worker的实现类

// ...

@NonNull

@Override

public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {

if (tasks.isDisposed()) {

// 如果已经被取消订阅,则不进行调度

return EmptyDisposable.INSTANCE;

}

return threadWorker.scheduleActual(action, delayTime, unit, tasks); // 将任务交给线程池处理

}

}

```

在这段代码中,我们可以看到IO操作最终会指向一个Scheduler,例如IoScheduler。当调用Android主线程(AndroidSchedulers.mainThread)时,实际上仍然使用了Handler机制:

```java

public final class AndroidSchedulers {

// ...

static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));

}

```

这里的HandlerScheduler实际上就是实现了Scheduler和Scheduler.Worker内部类。

HandlerScheduler是RxJava提供的一个调度器,它继承自Scheduler类。HandlerScheduler的主要作用是将任务交给Handler的Worker去做,而这个Worker是根据定义的线程来实现了不同的线程池,实际上还是交给线程池去处理了。至于主线程,RxJava也使用了Handler机制。

在HandlerScheduler中,我们可以看到一个内部类:HandlerWorker,它继承自Worker类。Worker类是用于管理线程池中的线程的,而HandlerWorker则是用于处理消息的。在这个内部类中,我们可以看到一个重写的schedule方法,该方法接受一个Runnable对象、一个延迟时间和一个时间单位作为参数。在这个方法中,我们首先创建了一个Message对象,然后将其发送给Handler,并设置了一个最大延迟时间。这样,当延迟时间到达时,Handler就会调用message.replyTo.send方法来处理这个消息。

总结一下,Android中的多线程切换主要使用Runnable和Callable来定义工作内容,使用线程池来实现异步并行,使用Handler机制来通知主线程,有些场景下会视情况需要使用Future接口回调或使用SynchronousQueue阻塞队列等。