233 lines
7.7 KiB
Markdown
233 lines
7.7 KiB
Markdown
---
|
||
title: Okhttp异步流程源码分析
|
||
date: 2020-06-09 16:57:00
|
||
tags: [kotlin]
|
||
categories: [kotlin]
|
||
author: Karl
|
||
---
|
||
# 异步请求
|
||
一般的异步调用代码为
|
||
~~~kotlin
|
||
val enqueueResponse = OkHttpClient().newCall(request).enqueue(object: Callback {
|
||
override fun onFailure(call: Call, e: IOException) {
|
||
TODO("Not yet implemented")
|
||
}
|
||
|
||
override fun onResponse(call: Call, response: Response) {
|
||
TODO("Not yet implemented")
|
||
}
|
||
})
|
||
~~~
|
||
查看enqueue源码,在RealCall中实现
|
||
~~~java
|
||
@Override public void enqueue(Callback responseCallback) {
|
||
synchronized (this) {
|
||
if (executed) throw new IllegalStateException("Already Executed");
|
||
executed = true;
|
||
}
|
||
captureCallStackTrace();
|
||
eventListener.callStart(this);
|
||
client.dispatcher().enqueue(new AsyncCall(responseCallback));
|
||
}
|
||
~~~
|
||
流程跟同步的没什么差别,我们直接看dispathcer中的enqueue
|
||
~~~java
|
||
void enqueue(AsyncCall call) {
|
||
synchronized (this) {
|
||
readyAsyncCalls.add(call);
|
||
}
|
||
promoteAndExecute();
|
||
}
|
||
~~~
|
||
这里跟同步的也并没有什么差别,只是将请求加入了异步等待队列,我们在进入promoteAndExecute方法,从命名上看这个方法应该是校验和执行的方法
|
||
~~~java
|
||
private boolean promoteAndExecute() {
|
||
assert (!Thread.holdsLock(this));
|
||
|
||
List<AsyncCall> executableCalls = new ArrayList<>();
|
||
boolean isRunning;
|
||
synchronized (this) {
|
||
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
|
||
AsyncCall asyncCall = i.next();
|
||
|
||
if (runningAsyncCalls.size() >= maxRequests) break; // Max capacity.
|
||
if (runningCallsForHost(asyncCall) >= maxRequestsPerHost) continue; // Host max capacity.
|
||
|
||
i.remove();
|
||
executableCalls.add(asyncCall);
|
||
runningAsyncCalls.add(asyncCall);
|
||
}
|
||
isRunning = runningCallsCount() > 0;
|
||
}
|
||
|
||
for (int i = 0, size = executableCalls.size(); i < size; i++) {
|
||
AsyncCall asyncCall = executableCalls.get(i);
|
||
asyncCall.executeOn(executorService());
|
||
}
|
||
|
||
return isRunning;
|
||
}
|
||
~~~
|
||
首先判断判断线程是否阻塞,创建一个AsyncCall对象的列表
|
||
对请求加锁
|
||
从队列中取出请求将请求添加进执行请求队列和异步执行队列runningAsyncCall中
|
||
然后将其放入executorService对象中执行
|
||
我们去查看executorService是什么
|
||
~~~java
|
||
public synchronized ExecutorService executorService() {
|
||
if (executorService == null) {
|
||
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
|
||
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
|
||
}
|
||
return executorService;
|
||
}
|
||
~~~
|
||
我们发现这里如果没有线程池的话,创建一个线程池并返回该线程池
|
||
我们在看看executeOn干了什么
|
||
~~~java
|
||
void executeOn(ExecutorService executorService) {
|
||
assert (!Thread.holdsLock(client.dispatcher()));
|
||
boolean success = false;
|
||
try {
|
||
executorService.execute(this);
|
||
success = true;
|
||
} catch (RejectedExecutionException e) {
|
||
InterruptedIOException ioException = new InterruptedIOException("executor rejected");
|
||
ioException.initCause(e);
|
||
eventListener.callFailed(RealCall.this, ioException);
|
||
responseCallback.onFailure(RealCall.this, ioException);
|
||
} finally {
|
||
if (!success) {
|
||
client.dispatcher().finished(this); // This call is no longer running!
|
||
}
|
||
}
|
||
}
|
||
~~~
|
||
我们看到这里只是单纯的使用线程池执行而已
|
||
我们在去看看执行的核心代码
|
||
## AsyncCall
|
||
查看源码
|
||
~~~java
|
||
|
||
final class AsyncCall extends NamedRunnable {
|
||
private final Callback responseCallback;
|
||
|
||
AsyncCall(Callback responseCallback) {
|
||
super("OkHttp %s", redactedUrl());
|
||
this.responseCallback = responseCallback;
|
||
}
|
||
|
||
String host() {
|
||
return originalRequest.url().host();
|
||
}
|
||
|
||
Request request() {
|
||
return originalRequest;
|
||
}
|
||
|
||
RealCall get() {
|
||
return RealCall.this;
|
||
}
|
||
|
||
/**
|
||
* Attempt to enqueue this async call on {@code executorService}. This will attempt to clean up
|
||
* if the executor has been shut down by reporting the call as failed.
|
||
*/
|
||
void executeOn(ExecutorService executorService) {
|
||
assert (!Thread.holdsLock(client.dispatcher()));
|
||
boolean success = false;
|
||
try {
|
||
executorService.execute(this);
|
||
success = true;
|
||
} catch (RejectedExecutionException e) {
|
||
InterruptedIOException ioException = new InterruptedIOException("executor rejected");
|
||
ioException.initCause(e);
|
||
eventListener.callFailed(RealCall.this, ioException);
|
||
responseCallback.onFailure(RealCall.this, ioException);
|
||
} finally {
|
||
if (!success) {
|
||
client.dispatcher().finished(this); // This call is no longer running!
|
||
}
|
||
}
|
||
}
|
||
|
||
@Override protected void execute() {
|
||
boolean signalledCallback = false;
|
||
timeout.enter();
|
||
try {
|
||
Response response = getResponseWithInterceptorChain();
|
||
if (retryAndFollowUpInterceptor.isCanceled()) {
|
||
signalledCallback = true;
|
||
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
|
||
} else {
|
||
signalledCallback = true;
|
||
responseCallback.onResponse(RealCall.this, response);
|
||
}
|
||
} catch (IOException e) {
|
||
e = timeoutExit(e);
|
||
if (signalledCallback) {
|
||
// Do not signal the callback twice!
|
||
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
|
||
} else {
|
||
eventListener.callFailed(RealCall.this, e);
|
||
responseCallback.onFailure(RealCall.this, e);
|
||
}
|
||
} finally {
|
||
client.dispatcher().finished(this);
|
||
}
|
||
}
|
||
}
|
||
~~~
|
||
我们发现他是继承于NamedRunnable我们在去看看NamedRunnable是什么
|
||
~~~java
|
||
public abstract class NamedRunnable implements Runnable {
|
||
protected final String name;
|
||
|
||
public NamedRunnable(String format, Object... args) {
|
||
this.name = Util.format(format, args);
|
||
}
|
||
|
||
@Override public final void run() {
|
||
String oldName = Thread.currentThread().getName();
|
||
Thread.currentThread().setName(name);
|
||
try {
|
||
execute();
|
||
} finally {
|
||
Thread.currentThread().setName(oldName);
|
||
}
|
||
}
|
||
|
||
protected abstract void execute();
|
||
}
|
||
~~~
|
||
可以看到NamedRunnable是一个抽象类,我们直接看run方法,可以看到,这里将当前执行的线程的名字设为我们在构造方法中传入的名字,接着执行execute方法,finally再设置回来。所以我们在回到AsyCall找execute方法了。
|
||
~~~java
|
||
@Override protected void execute() {
|
||
boolean signalledCallback = false;
|
||
timeout.enter();
|
||
try {
|
||
Response response = getResponseWithInterceptorChain();
|
||
if (retryAndFollowUpInterceptor.isCanceled()) {
|
||
signalledCallback = true;
|
||
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
|
||
} else {
|
||
signalledCallback = true;
|
||
responseCallback.onResponse(RealCall.this, response);
|
||
}
|
||
} catch (IOException e) {
|
||
e = timeoutExit(e);
|
||
if (signalledCallback) {
|
||
// Do not signal the callback twice!
|
||
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
|
||
} else {
|
||
eventListener.callFailed(RealCall.this, e);
|
||
responseCallback.onFailure(RealCall.this, e);
|
||
}
|
||
} finally {
|
||
client.dispatcher().finished(this);
|
||
}
|
||
}
|
||
}
|
||
~~~
|
||
看到了我们熟悉的Response对象和getResponseWithInterceptorChain方法,后续就是一些状态的回调,就不在分析了,异步的流程源码就分析到这了,后续在进几个默认的拦截器看看
|