一路走来,我在安卓并发路上踩的坑

概述

个人愚见,安卓开发和服务端的开发不太一样,Java服务端的开发主要是处理大量的IO,数据库查询等,但是我在做安卓开发以来,主要用到的并发方面的场景,主要包括多线程的上传和下载数量较多的图片,别的较大并发的场景还真不太多,等我想到了会补充在这里,可能是我个人愚笨吧,这一路走来,在这个在大神看来并不复杂的问题上,我走了很多弯路,也一直在这方面进行思考,带着这些问题,通过不断的在google,stackoverflow上阅读相关的资料以及简书一类的博客,终于算是对这一方面比较清晰了。在我截下来要要描述的场景中,没有资源的竞争的发生,并发传图片或者下载图片嘛,你懂的,所以如果是关注这方面的童鞋可以绕过,我后面也打算写一篇关于Java中互斥资源的文章。

基础知识回顾

同步机制与异步机制编程

如果我们打算干一个活,如果这个活是比较耗费时间的,比如网络IO,数据库查询等,那么干这个活有两种机制可以选,分别是同步机制和异步机制。同步机制是指要干活了,那么就自己老老实实干,直到干完了,才去干别的活;而异步机制比较聪明,假设他有能够召唤的小弟,那么这个活到他手上的时候,他就叫一个小弟来然后对他说,你帮我干这个活,等你干完活了把结果给我汇报一下,他最后只需要这些小弟干的活进行总结就行了;以上是我自己的比喻,虽然不是很贴切,但是讲述了基本思想。
比较两种干活方式,明显第一种比较傻,第二种方式能够让资源的到充分的利用。两种场景中,映射到计算机世界里面的,小弟就是线程的概念,如果计算机允许开启多线程的话,我们就可以选去这聪明的方式,这样就算有很多活,如果他有足够的小弟,那么这些小弟都在工作,没有空闲的,那么工作效率一定很高,但是小弟的数量是有限的,就像每个进程中线程的开启数量是有讲究的,不能开启太多,因为系统资源毕竟有限。
小弟间能不能愉快的干活,需要一个管理员或者叫做公关,能够比较好的管理它们,总不能让它们因为彼此拿了对方的东西打架吧,这个时候在计算机世界里面,线程池就引入了,好了,这里是泛泛的讲,就不深入了。

线程和进程

线程是操作系统能够进行运算调度的最小单位,它被包含在进程之中,是进程中的实际运作单位。程序员可以通过它进行多处理器编程,你可以使用多线程对运算密集型任务提速。一个进程是一个独立(self contained)的运行环境,它可以被看作一个程序或者一个应用。而线程是在进程中执行的一个任务。线程是进程的子集,一个进程可以有很多线程,每条线程并行执行不同的任务。不同的进程使用不同的内存空间,而所有的线程共享一片相同的内存空间。

Runnable和Callable

Runnable和Callable都代表那些要在不同的线程中执行的任务。Runnable的活都在run()方法里面干,干完了不用返回结果,而Callable的活在call()中去执行,该方法可以返回值和抛出异常,Callable可以返回装载有计算结果的Future对象,简而言之,就是一个需要回报结果,一个直接干完了事。

Future和Callable

既然Callable返回一个装有计算结果的Future对象,那么Future又是神马呢?Callable接口使用泛型去定义它的返回类型。Executors类提供了一些有用的方法去在线程池中执行Callable内的任务。由于Callable任务是并行的,我们必须等待它返回的结果,Future对象为我们解决了这个问题。在线程池提交Callable任务后返回了一个Future对象,使用它我们可以知道Callable任务的状态和得到Callable返回的执行结果。Future提供了get()方法让我们可以等待Callable结束并获取它的执行结果。

Executor,Executors,ExecutorService

Executor是一个接口,它只有一个方法void execute(Runnable command),即接受一个Runnable去执行,这是最顶层的抽象,可以发现,这里并没有Callable的踪迹,看来Java设计者一开始并没有返回结果方面的打算。ExecutorService是Executor的实现类,但是它不仅仅是实现了接受Runnable去执行的方法,同时还添加了别的接受Callable即带返回结果的方法,这些方法都返回一个封装在Future中的结果。那么Executors类是干什么用的呢,我的理解是它是一个辅助类,Executors为Executor,ExecutorService,ScheduledExecutorService,ThreadFactory和Callable类提供了一些工具方法,Executors可以用于方便的创建线程池。比如我用的最多的场景的的固定数量线程池,该ExecutorService可以通过Executors.newFixedThreadPool(MAX_THREAD_COUNT)来创建。ThreadPoolExecutor是一个ExecutorService的实现,这个类有很多参数可以选择,但是参数多了,你懂的,业余选手很可能搞得很糟糕,那么多年以来,官方提供的一些方法,可能更适用,至少它是经过了很多开发者生产环境下测试过的,本文在这里也不打算展开讲,后面考虑去总结一下这方面。

CountDownLatch,CyclicBarrier和Semaphore
ExecutorService的shutdown(),shutdownNow()和awaitTermination()

调用shutdown()以后,之前提交的任务都会被执行,但是新提交的任务则不会被允许放入任务队列中。如果之前被调用过了的话,那么再次调用也没什么用。这个方法不会等待之前提交的任务完成执行,如果希望的话,则需要调用awaitTermiante方法。

结合案例

需求分析

从网上下载1000张图片保存,更新进度,支持主动取消是失败重传三次机制,哒哒哒,问题超级简单是吧,但是就是这么一个小功能,我在具体实践的时候,发现并不是那么容易,而且一路过来我踩了很多坑,但是这些坑一路才过来,让我对这方面更有经验了,而且也对这一类的场景有了更多的理解。在本文中,我们先去下载网上的一张图片100次吧,这张图片的地址是:https://avatars2.githubusercontent.com/u/6789420?v=3&s=460,我的logo,哈哈

第一版 乞丐版

第一版当然是比较挫的咯,不过居然能想到用线程池,还是加一点分的撒,先来写一个最简单的下载函数吧

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class DownloadUtil {
public static void download(Callback callback) {
try {
URL website = new URL("https://avatars2.githubusercontent.com/u/6789420?v=3&s=460");
ReadableByteChannel rbc = Channels.newChannel(website.openStream());
FileOutputStream fos = new FileOutputStream("xxx.png");
fos.getChannel().transferFrom(rbc, 0, Long.MAX_VALUE);
callback.onSuccess();
} catch (IOException e) {
callback.onFail();
e.printStackTrace();
}
}
public interface Callback {
void onSuccess();
void onFail();
}
}

经过我试验,在我家的wifi情况下,多线程下载的时候是平均1.5秒一张图,好了,我们的下载没有返回值,那么打包成一个Runnable即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class DownloadJob implements Runnable {
private String identifier;
public DownloadJob(String identifier) {
this.identifier = identifier;
}
@Override
public void run() {
DownloadUtil.download(new DownloadUtil.Callback() {
@Override
public void onSuccess() {
System.out.println("download job " + identifier + " success");
}
@Override
public void onFail() {
System.out.println("download job " + identifier + " fail");
}
});
}
}

准备工作就是这么两个简单的类,一个是阻塞返回成功和失败回调的函数,还有一个是Runnable任务类,记得我第一次如下调用的。

1
2
3
4
5
6
public static void main(String[] args) {
ExecutorService pool = Executors.newFixedThreadPool(3);
for (int i = 0; i < 100; i++) {
pool.submit(new DownloadJob(String.valueOf(i)));
}
}

好了,以上就是我调用的方式,其实也没什么问题,如果仅仅是不带取消且不监测进度的上传,那么perfect,但是我们的产品设计的需求里面包含点击按钮就取消下载任务,一开始,看到ExecutorService的shutdown()函数,开心死了,心想,这也太简单了吧,使用它不就可以停止任务了嘛。于是果断适用了它来终结任务。

1
2
3
4
5
6
7
8
9
10
11
12
public static void main(String[] args) {
ExecutorService pool = Executors.newFixedThreadPool(3);
for (int i = 0; i < 100; i++) {
pool.submit(new DownloadJob(String.valueOf(i)));
}
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
pool.shutdown();
}

满怀期待,所有的线程都会停止,but,shit happened,所有的线程并不理我,疯了一样继续运行着,跟没事一样,so what happened,于是去看shutdown()的解释:调用shutdown()以后,之前提交的任务都会被执行,但是新提交的任务则不会被允许放入任务队列中。如果之前被调用过了的话,那么再次调用也没什么用。这个方法不会等待之前提交的任务完成执行,如果希望的话,则需要调用awaitTermiante方法。这样就说的通了,shutdown()并不是我们想象的去叫停正在工作的线程,而是将池子关闭,不去接受新的任务,那么已经放入的任务不受影响,会继续走完。这不是我们想要的,那么在ExecutorService里面发现了另外一个API,shutdownNow(),这个看上去厉害了,立刻关闭,于是拿过来,将上面的pool.shutdown()改为pool.shutdownNow(),好家伙,它一上场,的确停住了,Log如下:

1
2
3
4
5
6
7
8
9
10
11
java.nio.channels.ClosedByInterruptException
at java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202)
at sun.nio.ch.FileChannelImpl.size(FileChannelImpl.java:315)
at sun.nio.ch.FileChannelImpl.transferFrom(FileChannelImpl.java:704)
at DownloadUtil.download(DownloadUtil.java:17)
at DownloadJob.run(DownloadJob.java:15)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Log包含同样的Exception三次,但是,总的来说,任务是停住了,它把现有的在run的任务也给停住了,用更专业的叫法是interupted了,如果这个线程是能够被interupted的话,那么这个办法的确吊吊的,可是在我看来,这种方式并不理想,原因有三点:
首先,该方法用抛出异常的方式去终结,比较理想的方式是让已经运行了的线程去执行完毕,而没有开始的线程就不要执行了,对于这一点,ExecutorService的Api就没辙了,
其次,回头去看shutdownNow()这个函数的注释里的一段话,

1
There are no guarantees beyond best-effort attempts to stop processing actively executing tasks. For example, typical implementations will cancel via {@link Thread#interrupt}, so any task that fails to respond to interrupts may never terminate.

这段话讲的很清楚,如果我们的线程不能通过线程的interupt方法去取消的话,那么这个线程可能再也无法去终结了,可见,该方法是用场合比较受限,当然我们这种是可以的。
再然后,当调用了shutdown()或者shutdownNow()以后,池子就不能再使用了,退出了循环。我们可以在shutdown()后再去加一个添加任务的方法:

1
2
3
4
5
public static void main(String[] args) {
///...
pool.shutdown();
pool.submit(new DownloadJob("101"));
}

代码会抛出如下异常:

1
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@4d7e1886 rejected from java.util.concurrent

也就是说线程池拒绝调度该Runnable了。好了,在这里卡了一段时间,没辙了,于是放弃了,退而求其次吧,自己手动去管理呗,不就是控制正在run的线程的数量吗?这点算法,哥还是手动调度的来的,于是有了接下来南辕北辙的第二版。

第二版 朴素版 初步实现功能->

第二版中,典型的闭门造车的故事,好吧,虽然代码丑陋,但是毕竟还是成功的调度了,至少能够工作,主要用到了AtomicInteger,Semaphore,LinkedBlockingDeque和一些辅助变量。
将DownloadUtil的阻塞下载方放到线程中去。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class DownloadUtil {
public static void download(Callback callback, String identifier) {
new Thread(new Runnable() {
@Override
public void run() {
try {
URL website = new URL("https://avatars2.githubusercontent.com/u/6789420?v=3&s=460");
ReadableByteChannel rbc = Channels.newChannel(website.openStream());
FileOutputStream fos = new FileOutputStream("xxx.png");
fos.getChannel().transferFrom(rbc, 0, Long.MAX_VALUE);
callback.onSuccess();
System.out.println(identifier + " download success");
} catch (IOException e) {
callback.onFail();
System.out.println(identifier + " download fail");
e.printStackTrace();
}
}
}).start();
}
public interface Callback {
void onSuccess();
void onFail();
}
}

然后我们的调用方法,就变成了下面的样子。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class Downloader {
private boolean stopFlag = false;
public void terminate() {
System.out.println("try to terminate");
stopFlag = true;
}
public void downloadImages() {
Queue<Integer> queue = new LinkedBlockingDeque<>();
for (int i = 0; i < 20; i++) {
queue.add(i);
}
Semaphore semaphore = new Semaphore(3);
while (!queue.isEmpty() && !stopFlag) {
try {
semaphore.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
}
DownloadUtil.download(new DownloadUtil.Callback() {
@Override
public void onSuccess() {
semaphore.release();
}
@Override
public void onFail() {
semaphore.release();
}
}, String.valueOf(queue.poll()));
}
}
}

在main方法中,我们只需要

1
2
Downloader downloader = new Downloader();
downloader.downloadImages();

这个办法虽然每个IO下载任务都是放到了线程总去,但是在Semaphore那里的话,还是会阻塞,主线程也是被卡住的,但是回调通知是可以的,没办法,为了完成任务,我的手动控制就是这么实现的,至少把任务完成了,但是回头看,好多的缺点呀,需要改进。

第三版 改进版->从Future入手

我们的直觉告诉我们,作为管理员的ExecutorService才是开启或者叫停线程的,但事实上,这并不是设计者的初衷,之所以没有采用这种机制,我个人的见解是这样会引入一些问题,在我们的场景中,我们的任务都是单一的下载图片,但是,如果这个线程池里面还有一个需要长时间链接下载电影的线程呢?如果我们通过管理员去关闭了线程池,那岂不是里面所有的活都会被interupt而停止,也许你会说,那我们可以新建一个新的池子啊,但是你想,这不是浪费资源么,如果每下载一个东西就去新建一个池子,那么,这个池子的意义何在呢?既然是任务池,那么就应该是一直存在,活来了就干,当然我们想要销毁也能支持销毁,这个屌用shutdown(),循环就退出了。
直觉的东西不一定是对的,有些时候,我们需要换个思路,我们能不能通过任务Runnable本身呢,事实证明Runnable本身就只有一个run()方法,没有任何可以操纵的API,所以果断放弃。其实我们的每个Runnable在加入池子的时候,会返回我们一个可操纵句柄的,这个句柄就是Future,Future中有三个方法比较重要,分别是get(),cancel(boolean interupted)和isDone();get()方法会等待线程池调度该线程后的返回结果,期间会阻塞,cancel()可以用于取消提交到池子中的任务,参数interupted用于控制是否向已经在工作的线程发出interupt终止信号,在我的需求中,答案是NO,isDone()用于判断这个任务是否已经被调度执行完了,这个在判断是否要对某个任务去执行取消操作很有用。
好了,上代码,仍然沿用第一版的其它代码,只是在使用的时候,使用Future集合去管理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
ExecutorService pool = Executors.newFixedThreadPool(3);
List<Future> tasks = new ArrayList();
for (int i = 0; i < 100; i++) {
tasks.add(pool.submit(new DownloadJob(String.valueOf(i))));
}
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//这里想要在两秒中以后执行叫停操作
for(Future task : tasks){
if(!task.isDone()){
task.cancel(false);
}
}

经事实验证,效果妥妥的,三个版本就是我的传图片方面的演变,第三版基本上可以应付大多数场合了。

代码重构

工欲善其事,必先利其器,有了以上的基础知识储备,是时间回头去重构以前的一些肮脏代码了。

浏览并离线下载图片页面的重构

之前的一位同事在写离线下载页面的时候,可能在并发方面的经验不足,所以有很多的问题,代码很脏。
我们的这个需求是这样子的,