多线程是Android开发中常见的场景,许多原生API和开源项目都涉及到多线程的内容。本文将简单总结和探讨一下常见的多线程切换方式。首先,我们回顾一下Java多线程的几个基础内容,然后再分析总结一些经典代码中对于线程切换的实现方式。
几点基础:
1. 多线程切换主要包括如何开启多个线程、如何定义每个线程的任务以及如何在线程之间互相通信。
2. Thread类可以解决开启多个线程的问题。Thread是Java中实现多线程的线程类,每个Thread对象都可以启动一个新的线程。可以通过以下两种方式启动新线程:
- thread.run(); // 不启动新线程,在当前线程执行
- thread.start(); // 启动新线程。
3. Thread存在线程优先级问题。如果为Thread设置较高的线程优先级,就有机会获得更多的CPU资源。注意这里只是有机会,优先级高的Thread不是必然会先于其他Thread执行,只是系统会倾向于给它分配更多的CPU。默认情况下,新建的Thread和当前Thread的线程优先级一致。设置线程优先级有两种方式:
- thread.setPriority(Thread.MAX_PRIORITY); // 1~10,通过线程设置
- Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND); // -20~19,通过进程设置
4. 在Android开发中,一般建议通过Process进程设置优先级。
5. Thread本身是需要占用内存的,开启/销毁过量的工作线程会造成过量的资源损耗。针对这种情况,我们一般会通过对资源的复用来进行优化。针对IO资源,我们会做IO复用(例如Http的KeepAlive);针对内存,我们会做内存池复用(例如Fresco的内存池);针对CPU资源,我们一般会做线程复用,也就是线程池。因此,在Android开发中,一般不会直接开启大量的Thread,而是会使用ThreadPool来复用线程。
6. Runnable主要解决如何定义每个线程的工作任务的问题。Runnable是Java中实现多线程的接口,相对Thread而言,Runnable接口更容易扩展(不需要单继承),而且,Thread本身也是一种Runnable。
在Java中,实现多线程编程有多种方式。相比Thread而言,Runnable不关注如何调度线程,只关心如何定义要执行的工作任务,所以在实际开发中,多使用Runnable接口完成多线程开发。此外,还有Callable和Future等接口可以用于多线程编程。
1. Callable
Callable和Runnable基本类似,但是Callable可以返回执行结果。这意味着我们可以在Callable的call方法中执行一些耗时的操作,并将结果返回给调用者。这样,我们就可以在主线程中获取子线程的执行结果,而不需要使用回调函数或者Future。
2. 线程间通信
虽然Thread和Runnable能实现切换到另一个线程工作(Runnable需要额外指派工作线程),但它们完成任务后就会退出,并不注重如何在线程间实现通信。因此,在切换线程时,还需要在线程间通信,这就需要一些线程间通信机制。常见的线程间通信机制有:wait()、notify()、notifyAll()、join()、sleep()、yield()等方法。
3. Future
一般来说,如果要做简单的通信,我们最常用的是通过接口回调来实现。Future就是这样一种接口,它可以部分地解决线程通信的问题。Future接口定义了done、canceled等回调函数,当工作线程的任务完成时,它会(在工作线程中)通过回调告知我们,我们再采用其他手段通知其他线程。例如:
```java
Future
```
4. Condition
Condition其实是和Lock一起使用的,但如果把它视为一种线程间通信的工具,也说的通。因为,Condition本身定位就是一种多线程间协调通信的工具,Condition可以在某些条件下,唤醒等待线程。例如:
```java
Lock lock = new ReentrantLock();
Condition notFull = lock.newCondition(); //定义Lock的Condition
while (count == items.length)
notFull.await();//等待condition的状态
notFull.signal();//达到condition的状态
```
5. Handler
Handler主要用于Android平台上的异步消息处理。在Android中,可以使用Handler来实现子线程与主线程之间的通信。例如:
```java
Handler mHandler = new Handler(Looper.getMainLooper()) {
@Override
public void handleMessage(Message msg) {
//处理消息
} };
AsyncTask的多线程切换
在回顾了多线程的基本概念之后,我们来看一下简单的多线程切换——Android自带的AsyncTask。AsyncTask主要在doInBackground函数中定义工作线程的工作内容,在其他函数中定义主线程的工作内容,例如onPostExecute。这里涉及到两个问题:1. 如何实现把doInBackground抛给工作线程;2. 如何实现把onPostExecute抛给主线程。实际上,解决这两个问题非常简单。
首先,我们来看如何实现把doInBackground抛给工作线程。在使用AsyncTask时,我们通常会创建一个基于AsyncTask的扩展类或匿名类,在其中实现几个抽象函数,例如:
```java
private class MyTask extends AsyncTask
@Override
protected void onPreExecute() {... }
@Override
protected Long doInBackground(String... params) {... }
@Override
protected void onProgressUpdate(Object... values) {... }
@Override
protected void onPostExecute(Long aLong) {... }
@Override
protected void onCancelled() {... }
}
```
接下来,我们实例化这个AsyncTask:
```java
MyTask mTask = new MyTask();
```
在AsyncTask源码中,我们可以看到,构造函数里会创建一个WorkerRunnable对象:
```java
protected void execute(Params... params) {
mWorker.mParams = params;
mWorker.mCurrPage = 0;
mWorker.mLastPage = Integer.MAX_VALUE;
mWorker.mOutputList = new ArrayList<>();
mWorker.mTaskListener = this;
mExecuting = true;
mWorkerThread = new Thread(mWorker);
mWorkerThread.start();
}
```
在这个过程中,我们可以看到,AsyncTask通过创建一个WorkerRunnable对象并将其传递给新创建的线程来实现将doInBackground抛给工作线程。这样,工作线程就可以在doInBackground函数中执行耗时操作,而主线程则可以在onPostExecute或其他函数中处理结果。
public AsyncTask() { mWorker = new WorkerRunnable
public Result call() throws Exception {
...
result = doInBackground(mParams); //在工作线程中执行
...
}
};
}
实际上,WorkerRunnable是一个Callable对象。因此,doInBackground被包装在一个Callable对象中,这个Callable还会继续被包装,最终被交给一个线程池去执行:
```java
Runnable mActive;
...
if ((mActive = mTasks.poll()) != null) {
THREAD_POOL_EXECUTOR.execute(mActive); //交给线程池执行
}
```
梳理一下,大致过程为:定义doInBackground -> 被一个Callable调用 -> 层层包为一个Runnable -> 交给线程池执行。这样就解决了第一个问题,如何实现把doInBackground抛给工作线程。我们再来看第二个问题。
2. 如何实现把onPostExecute抛给主线程?
首先,我们需要知道工作任务何时执行完毕,这就需要在工作完成时触发一个接口回调,也就是前面提到的Future。还是看AsyncTask源码:
以下是重构后的代码:
public AsyncTask() {
mWorker = new WorkerRunnable
@Override
public Result doInBackground(Params... params) {
try {
return onPreExecute();
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
@Override
protected void onPostExecute(Result result) {
//处理onPostExecute函数,如果有需要可以在这里更新UI等操作
if (result != null) {
postResultIfNotInvoked(result); //postResultIfNotInvoked是自定义的方法,用于将结果传递出去
} else {
throw new RuntimeException("执行任务时发生异常!");
}
}
};
mFuture = new FutureTask<>(mWorker) {
@Override
protected void done() {//Future的回调
try {
get(); //get()是FutureTask接口函数
} catch (Exception e) {
e.printStackTrace();
} finally {
//无论是否成功都要调用onCancelled来取消注册
onCancelled();
}
}
};
}
```java
private Result postResult(Result result) {
@SuppressWarnings("unchecked")
Message message = getHandler().obtainMessage(MESSAGE_POST_RESULT,
new AsyncTaskResult
message.sendToTarget();
return result;
}
// getHandler()会指向InternalHandler
private static class InternalHandler extends Handler {
public InternalHandler() {
super(Looper.getMainLooper()); //指向MainLooper
}
@SuppressWarnings({"unchecked", "RawUseOfParameterizedType"})
@Override
public void handleMessage(Message msg) {
AsyncTaskResult> result = (AsyncTaskResult>) msg.obj;
switch (msg.what) {
case MESSAGE_POST_RESULT:
// There is only one result
result.mTask.finish(result.mData[0]); //通过handler机制,回到主线程,调用finish函数
...
}
// 在Handler中,最终会在主线程中调用finish
private void finish(Result result) {
if (isCancelled()) {
onCancelled(result);
} else {
onPostExecute(result); //调用onPostExecute接口函数
}
mStatus = Status.FINISHED;
}
}
```
从源码来看,AsyncTask实际上是通过Handler机制将任务抛给主线程执行的。总体来说,AsyncTask的多线程任务是通过线程池实现的工作线程来完成的。在任务完成后,利用Future的done回调通知任务已完成,并通过handler机制通知主线程执行onPostExecute等回调函数。
EventBus的多线程切换:
EventBus会为每个订阅事件注册一个目标线程。因此,需要从发布事件的线程中实时切换到目标线程。这是一个典型的多线程切换场景。根据EventBus的源码,多线程切换的主要判断代码如下:
请根据以下内容完成重构,并保持段落结构:
```java
switch (subscription.subscriberMethod.threadMode) {
case POSTING:
invokeSubscriber(subscription, event);//直接在当前线程执行
break;
case MAIN:
if (isMainThread) {
invokeSubscriber(subscription, event);//在当前主线程执行
} else {
mainThreadPoster.enqueue(subscription, event);//当然不是主线程,交给主线程执行
}
break;
case BACKGROUND:
if (isMainThread) {
backgroundPoster.enqueue(subscription, event);//当前线程为主线程,交给工作线程
} else {
invokeSubscriber(subscription, event);//直接在当前工作线程执行
}
break;
case ASYNC:
asyncPoster.enqueue(subscription, event);//异步执行
break;
default:
throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
}
```
EventBus 是一个用于实现 Android 应用程序中不同组件之间进行解耦和通信的工具。在 EventBus 中,如果需要在线程间切换,主要是通过抛给不同的任务队列来实现线程间切换。任务队列的设计可以有以下三种切换目标:主线程 Poster、backgroundPoster 和 asyncPoster。
EventBus 不能直接判断哪些任务会并行执行,因此采用了队列的设计。多线程任务(EventBus 的事件)会先进入队列,然后再处理队列中的工作任务,这是典型的生产-消费场景。主线程 Poster、backgroundPoster 和 asyncPoster 都是任务队列的不同实现。
1. 主线程 Poster:
负责处理主线程的 mainThreadPoster 是 Handler 的子类,其源码如下:
```java
final class HandlerPoster extends Handler { ...
void enqueue(Subscription subscription, Object event) {
...
synchronized (this) {
// 因为主线程只有一个,需要线程安全
queue.enqueue(pendingPost);
...
if (!sendMessage(obtainMessage())) {
// 作为 handler 发送消息
...
}
}
}
// 在主线程中处理消息
@Override
public void handleMessage(Message msg) {
...
}
}
```
从源码可以看出,这个 Poster 其实是一个 Handler,它采用了哪个线程的消息队列,就决定了它能和哪个线程通信。例如:
```java
EventBus(EventBusBuilder builder) { ...
mainThreadPoster = new HandlerPoster(this, Looper.getMainLooper(), 10); // 获取主线程的 MainLooper
}
```
2. backgroundPoster:
负责处理后台线程的 backgroundPoster 是类似于上述主线程 Poster 的实现,其源码与主线程 Poster 类似。主要区别在于它是在子线程中处理消息。例如:
```java
private static final class BackgroundPoster extends Handler { ...
}
```
3. asyncPoster:
负责处理异步任务的 asyncPoster 也是类似于上述主线程 Poster 的实现,其源码与主线程 Poster 类似。主要区别在于它是在单独的异步线程中处理消息。例如:
```java
private static final class AsyncPoster extends Handler { ...
}
```
EventBus是一个扩展了Handler的类,它通过Handler的消息机制实现了多线程切换。在这个过程中,Handler本身还增加了一个队列层。
EventBus使用了两个线程池:backgroundPoster和asyncPoster。这两个线程池都使用了默认的缓存线程池:DEFAULT_EXECUTOR_SERVICE,即Executors.newCachedThreadPool()。这意味着它们都将任务交给线程池处理,从而实现多线程切换。
尽管backgroundPoster和asyncPoster在功能上相似,但它们之间还是存在一些差异。首先,在newCachedThreadPool中,最大线程数就是Integer的最大值,相当于不设上限,因此可以尽可能多地启动线程。这就是为什么asyncPoster在enqueue和run方法中都没有进行同步操作,而是为每个事件单独开启新线程处理的原因。
而在backgroundPoster中,为了尽量复用线程,主要方法是在run方法中加入一个1秒的等待时间:
```java
@Override public void run() { ... PendingPost pendingPost = queue.poll(1000);//允许等待1秒
```
由于进行了这一秒的挂起等待,所以在enqueue和run方法中都需要使用synchronized关键字来确保线程安全。
此外,还有一个重要的用法是Executors.newCachedThreadPool()中的SynchronousQueue。
SynchronousQueue是一个阻塞队列,它不是数据等线程,而是线程等数据。这样每次向SynchronousQueue里传入数据时,都会立即交给一个线程执行,这样可以提高数据得到处理的速度。EventBus采用线程池实现工作线程,采用handler机制通知到主线程。不同在于,它采用的queue的队列方式来管理所有的跨线程请求,而且它利用了SynchronousQueue阻塞队列来辅助实现线程切换。RxJava的多线程切换能力非常令人赞叹。RxJava的主要概念是工作流,它可以把一序列工作流定义在一个线程类型上 。
构建工作流的过程相当复杂,但如果我们只关注线程操作部分,流程会变得非常清晰。让我们跟踪一下subscribeOn的源码(RxJava2):
```java
public final Flowable
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return subscribeOn(scheduler, !(this instanceof FlowableCreate));
}
public final Flowable
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new FlowableSubscribeOn
}
```
接下来,我们进入FlowableSubscribeOn类:
在这个类中,我们可以看到subscribeOn方法接受一个Scheduler参数和一个布尔值requestOn。Scheduler参数用于指定执行任务的线程调度器,而requestOn参数则表示是否在订阅时立即执行任务。
首先,我们使用ObjectHelper.requireNonNull方法检查传入的scheduler参数是否为null,如果为null则抛出异常。然后,根据传入的scheduler和requestOn参数创建一个新的FlowableSubscribeOn对象,并调用RxJavaPlugins.onAssembly方法进行组装。最后,返回组装好的Flowable对象。
```java
// 进入FlowableSubscribeOn类
public FlowableSubscribeOn(Flowable
...
this.scheduler = scheduler;
...
}
@Override
public void subscribeActual(final Subscriber super T> s) {
Scheduler.Worker w = scheduler.createWorker(); // 根据参数值,如Schedulers.io()创建worker
final SubscribeOnSubscriber
s.onSubscribe(sos);
w.schedule(sos);
}
```
其中,SubscribeOnSubscriber是一个内部类。
SubscribeOnSubscriber 类的构造方法接收一个 Subscriber 类型的参数 actual,一个 Scheduler.Worker 类型的参数 worker,一个 Publisher
在构造方法中,首先对传入的实际订阅者进行类型检查,确保它是 Subscriber super T> 类型。然后将传入的工作线程赋值给 this.worker。最后调用 super.onSubscribe(actual) 方法,通知实际订阅者已经准备好接收数据。
在 requestUpstream 方法中,首先创建一个 Request 对象,将传入的订阅信息和请求的数量封装在其中。然后调用 worker.schedule() 方法,将 Request 对象提交到工作线程中执行。这里的 worker.schedule() 方法会根据传入的 Request 对象来决定如何执行任务(Request 是一个实现了 Runnable 接口的对象)。
```java
// ioreactivex.schedulers.Schedulers源码
public class SchedulersUtils {
static {
SINGLE = RxJavaPlugins.initSingleScheduler(new SingleTask());
COMPUTATION = RxJavaPlugins.initComputationScheduler(new ComputationTask());
IO = RxJavaPlugins.initIoScheduler(new IOTask());
TRAMPOLINE = TrampolineScheduler.instance();
NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());
}
// ...省略其他代码...
interface Callable
T call() throws Exception;
}
static final class IOTask implements Callable
@Override
public Scheduler call() throws Exception {
return IoHolder.DEFAULT;
}
}
static final class NewThreadTask implements Callable
@Override
public Scheduler call() throws Exception {
return NewThreadHolder.DEFAULT;
}
}
static final class SingleTask implements Callable
@Override
public Scheduler call() throws Exception {
return SingleHolder.DEFAULT;
}
}
static final class ComputationTask implements Callable
@Override
public Scheduler call() throws Exception {
return ComputationHolder.DEFAULT;
}
}
static final class SingleHolder {
static final Scheduler DEFAULT = new SingleScheduler();
}
static final class ComputationHolder {
static final Scheduler DEFAULT = new ComputationScheduler();
}
static final class IoHolder {
static final Scheduler DEFAULT = new IoScheduler();
}
static final class NewThreadHolder {
static final Scheduler DEFAULT = new NewThreadScheduler();
}
}
```
以下是重构后的代码,保持了原有的段落结构:
```java
// io.reactivex.internal.schedulers.IoScheduler源码
static final class EventLoopWorker extends Scheduler.Worker {
// Scheduler.Worker的实现类
// ...
@NonNull
@Override
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
if (tasks.isDisposed()) {
// 如果已经被取消订阅,则不进行调度
return EmptyDisposable.INSTANCE;
}
return threadWorker.scheduleActual(action, delayTime, unit, tasks); // 将任务交给线程池处理
}
}
```
在这段代码中,我们可以看到IO操作最终会指向一个Scheduler,例如IoScheduler。当调用Android主线程(AndroidSchedulers.mainThread)时,实际上仍然使用了Handler机制:
```java
public final class AndroidSchedulers {
// ...
static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
}
```
这里的HandlerScheduler实际上就是实现了Scheduler和Scheduler.Worker内部类。
HandlerScheduler是RxJava提供的一个调度器,它继承自Scheduler类。HandlerScheduler的主要作用是将任务交给Handler的Worker去做,而这个Worker是根据定义的线程来实现了不同的线程池,实际上还是交给线程池去处理了。至于主线程,RxJava也使用了Handler机制。
在HandlerScheduler中,我们可以看到一个内部类:HandlerWorker,它继承自Worker类。Worker类是用于管理线程池中的线程的,而HandlerWorker则是用于处理消息的。在这个内部类中,我们可以看到一个重写的schedule方法,该方法接受一个Runnable对象、一个延迟时间和一个时间单位作为参数。在这个方法中,我们首先创建了一个Message对象,然后将其发送给Handler,并设置了一个最大延迟时间。这样,当延迟时间到达时,Handler就会调用message.replyTo.send方法来处理这个消息。
总结一下,Android中的多线程切换主要使用Runnable和Callable来定义工作内容,使用线程池来实现异步并行,使用Handler机制来通知主线程,有些场景下会视情况需要使用Future接口回调或使用SynchronousQueue阻塞队列等。