--- title: Okhttp异步流程源码分析 date: 2020-06-09 16:57:00 tags: [kotlin] categories: [kotlin] author: Karl --- # 异步请求 一般的异步调用代码为 ~~~kotlin val enqueueResponse = OkHttpClient().newCall(request).enqueue(object: Callback { override fun onFailure(call: Call, e: IOException) { TODO("Not yet implemented") } override fun onResponse(call: Call, response: Response) { TODO("Not yet implemented") } }) ~~~ 查看enqueue源码,在RealCall中实现 ~~~java @Override public void enqueue(Callback responseCallback) { synchronized (this) { if (executed) throw new IllegalStateException("Already Executed"); executed = true; } captureCallStackTrace(); eventListener.callStart(this); client.dispatcher().enqueue(new AsyncCall(responseCallback)); } ~~~ 流程跟同步的没什么差别,我们直接看dispathcer中的enqueue ~~~java void enqueue(AsyncCall call) { synchronized (this) { readyAsyncCalls.add(call); } promoteAndExecute(); } ~~~ 这里跟同步的也并没有什么差别,只是将请求加入了异步等待队列,我们在进入promoteAndExecute方法,从命名上看这个方法应该是校验和执行的方法 ~~~java private boolean promoteAndExecute() { assert (!Thread.holdsLock(this)); List executableCalls = new ArrayList<>(); boolean isRunning; synchronized (this) { for (Iterator i = readyAsyncCalls.iterator(); i.hasNext(); ) { AsyncCall asyncCall = i.next(); if (runningAsyncCalls.size() >= maxRequests) break; // Max capacity. if (runningCallsForHost(asyncCall) >= maxRequestsPerHost) continue; // Host max capacity. i.remove(); executableCalls.add(asyncCall); runningAsyncCalls.add(asyncCall); } isRunning = runningCallsCount() > 0; } for (int i = 0, size = executableCalls.size(); i < size; i++) { AsyncCall asyncCall = executableCalls.get(i); asyncCall.executeOn(executorService()); } return isRunning; } ~~~ 首先判断判断线程是否阻塞,创建一个AsyncCall对象的列表 对请求加锁 从队列中取出请求将请求添加进执行请求队列和异步执行队列runningAsyncCall中 然后将其放入executorService对象中执行 我们去查看executorService是什么 ~~~java public synchronized ExecutorService executorService() { if (executorService == null) { executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS, new SynchronousQueue(), Util.threadFactory("OkHttp Dispatcher", false)); } return executorService; } ~~~ 我们发现这里如果没有线程池的话,创建一个线程池并返回该线程池 我们在看看executeOn干了什么 ~~~java void executeOn(ExecutorService executorService) { assert (!Thread.holdsLock(client.dispatcher())); boolean success = false; try { executorService.execute(this); success = true; } catch (RejectedExecutionException e) { InterruptedIOException ioException = new InterruptedIOException("executor rejected"); ioException.initCause(e); eventListener.callFailed(RealCall.this, ioException); responseCallback.onFailure(RealCall.this, ioException); } finally { if (!success) { client.dispatcher().finished(this); // This call is no longer running! } } } ~~~ 我们看到这里只是单纯的使用线程池执行而已 我们在去看看执行的核心代码 ## AsyncCall 查看源码 ~~~java final class AsyncCall extends NamedRunnable { private final Callback responseCallback; AsyncCall(Callback responseCallback) { super("OkHttp %s", redactedUrl()); this.responseCallback = responseCallback; } String host() { return originalRequest.url().host(); } Request request() { return originalRequest; } RealCall get() { return RealCall.this; } /** * Attempt to enqueue this async call on {@code executorService}. This will attempt to clean up * if the executor has been shut down by reporting the call as failed. */ void executeOn(ExecutorService executorService) { assert (!Thread.holdsLock(client.dispatcher())); boolean success = false; try { executorService.execute(this); success = true; } catch (RejectedExecutionException e) { InterruptedIOException ioException = new InterruptedIOException("executor rejected"); ioException.initCause(e); eventListener.callFailed(RealCall.this, ioException); responseCallback.onFailure(RealCall.this, ioException); } finally { if (!success) { client.dispatcher().finished(this); // This call is no longer running! } } } @Override protected void execute() { boolean signalledCallback = false; timeout.enter(); try { Response response = getResponseWithInterceptorChain(); if (retryAndFollowUpInterceptor.isCanceled()) { signalledCallback = true; responseCallback.onFailure(RealCall.this, new IOException("Canceled")); } else { signalledCallback = true; responseCallback.onResponse(RealCall.this, response); } } catch (IOException e) { e = timeoutExit(e); if (signalledCallback) { // Do not signal the callback twice! Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e); } else { eventListener.callFailed(RealCall.this, e); responseCallback.onFailure(RealCall.this, e); } } finally { client.dispatcher().finished(this); } } } ~~~ 我们发现他是继承于NamedRunnable我们在去看看NamedRunnable是什么 ~~~java public abstract class NamedRunnable implements Runnable { protected final String name; public NamedRunnable(String format, Object... args) { this.name = Util.format(format, args); } @Override public final void run() { String oldName = Thread.currentThread().getName(); Thread.currentThread().setName(name); try { execute(); } finally { Thread.currentThread().setName(oldName); } } protected abstract void execute(); } ~~~ 可以看到NamedRunnable是一个抽象类,我们直接看run方法,可以看到,这里将当前执行的线程的名字设为我们在构造方法中传入的名字,接着执行execute方法,finally再设置回来。所以我们在回到AsyCall找execute方法了。 ~~~java @Override protected void execute() { boolean signalledCallback = false; timeout.enter(); try { Response response = getResponseWithInterceptorChain(); if (retryAndFollowUpInterceptor.isCanceled()) { signalledCallback = true; responseCallback.onFailure(RealCall.this, new IOException("Canceled")); } else { signalledCallback = true; responseCallback.onResponse(RealCall.this, response); } } catch (IOException e) { e = timeoutExit(e); if (signalledCallback) { // Do not signal the callback twice! Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e); } else { eventListener.callFailed(RealCall.this, e); responseCallback.onFailure(RealCall.this, e); } } finally { client.dispatcher().finished(this); } } } ~~~ 看到了我们熟悉的Response对象和getResponseWithInterceptorChain方法,后续就是一些状态的回调,就不在分析了,异步的流程源码就分析到这了,后续在进几个默认的拦截器看看