本文基于OkHttp4.12.0源码分析 官方地址
概括
本篇主要是对okhttp开源库的一个详细解析,包含详细的请求流程分析、各大拦截器的解读等。
使用方法
同步请求:创建一个OKHttpClient对象,一个Request对象,然后利用它们创建一个Call对象,最后调用execute()方法来拿到Response
val client = OkHttpClient()
val request = Request.Builder().url("http://www.baidu.com").build()val response = client.newCall(request).execute();
异步调用:异步调用不同的在于使用enqueue方法并传入一个Callback接口来获取返回的Response。
val client = OkHttpClient()
val request = Request.Builder().url("http://www.baidu.com").build()
client.newCall(request).enqueue(object : Callback {override fun onFailure(call: Call, e: IOException) {e.printStackTrace()}override fun onResponse(call: Call, response: Response) {Log.d("TAG", "response = " + response.message)Log.d("TAG", "response = " + response.body!!.toString())}
})
下面关于一些关键类的解析。
OKHttpClient
这是一个请求配置类,采用了建造者模式,方便用户配置一些请求参数,如配置callTimeout,cookie,interceptor等等。
open class OkHttpClient internal constructor(builder: Builder
) : Cloneable, Call.Factory, WebSocket.Factory {
@get:JvmName("dispatcher") val dispatcher: Dispatcher = builder.dispatcher@get:JvmName("connectionPool") val connectionPool: ConnectionPool = builder.connectionPool/*** Returns an immutable list of interceptors that observe the full span of each call: from before* the connection is established (if any) until after the response source is selected (either the* origin server, cache, or both).*/@get:JvmName("interceptors") val interceptors: List<Interceptor> =builder.interceptors.toImmutableList()....constructor() : this(Builder())..../** Prepares the [request] to be executed at some point in the future. */override fun newCall(request: Request): Call = RealCall(this, request, forWebSocket = false)class Builder constructor() {internal var dispatcher: Dispatcher = Dispatcher()internal var connectionPool: ConnectionPool = ConnectionPool()internal val interceptors: MutableList<Interceptor> = mutableListOf()internal val networkInterceptors: MutableList<Interceptor> = mutableListOf()internal var eventListenerFactory: EventListener.Factory = EventListener.NONE.asFactory()internal var retryOnConnectionFailure = trueinternal var authenticator: Authenticator = Authenticator.NONEinternal var followRedirects = trueinternal var followSslRedirects = trueinternal var cookieJar: CookieJar = CookieJar.NO_COOKIESinternal var cache: Cache? = nullinternal var dns: Dns = Dns.SYSTEMinternal var proxy: Proxy? = nullinternal var proxySelector: ProxySelector? = nullinternal var proxyAuthenticator: Authenticator = Authenticator.NONEinternal var socketFactory: SocketFactory = SocketFactory.getDefault()internal var sslSocketFactoryOrNull: SSLSocketFactory? = nullinternal var x509TrustManagerOrNull: X509TrustManager? = nullinternal var connectionSpecs: List<ConnectionSpec> = DEFAULT_CONNECTION_SPECSinternal var protocols: List<Protocol> = DEFAULT_PROTOCOLSinternal var hostnameVerifier: HostnameVerifier = OkHostnameVerifierinternal var certificatePinner: CertificatePinner = CertificatePinner.DEFAULTinternal var certificateChainCleaner: CertificateChainCleaner? = nullinternal var callTimeout = 0internal var connectTimeout = 10_000internal var readTimeout = 10_000internal var writeTimeout = 10_000internal var pingInterval = 0internal var minWebSocketMessageToCompress = RealWebSocket.DEFAULT_MINIMUM_DEFLATE_SIZEinternal var routeDatabase: RouteDatabase? = null....
Request
这里作为请求参数的配置类,本身也采用了建造者模式,但相比OKHttpClient,Request就比较简单了,只有四个参数,分别是url、method、headers、body。
class Request internal constructor(@get:JvmName("url") val url: HttpUrl,@get:JvmName("method") val method: String,@get:JvmName("headers") val headers: Headers,@get:JvmName("body") val body: RequestBody?,internal val tags: Map<Class<*>, Any>
) {....
open class Builder {internal var url: HttpUrl? = nullinternal var method: Stringinternal var headers: Headers.Builderinternal var body: RequestBody? = null
...
Call
请求调用接口,表示这个请求已经准备好可以执行,也可以取消,只能执行一次。
interface Call : Cloneable {/** Returns the original request that initiated this call. */fun request(): Request/*** Invokes the request immediately, and blocks until the response can be processed or is in error.** To avoid leaking resources callers should close the [Response] which in turn will close the* underlying [ResponseBody].** ```* // ensure the response (and underlying response body) is closed* try (Response response = client.newCall(request).execute()) {* ...* }* ```** The caller may read the response body with the response's [Response.body] method. To avoid* leaking resources callers must [close the response body][ResponseBody] or the response.** Note that transport-layer success (receiving a HTTP response code, headers and body) does not* necessarily indicate application-layer success: `response` may still indicate an unhappy HTTP* response code like 404 or 500.** @throws IOException if the request could not be executed due to cancellation, a connectivity* problem or timeout. Because networks can fail during an exchange, it is possible that the* remote server accepted the request before the failure.* @throws IllegalStateException when the call has already been executed.*/@Throws(IOException::class)fun execute(): Response/*** Schedules the request to be executed at some point in the future.** The [dispatcher][OkHttpClient.dispatcher] defines when the request will run: usually* immediately unless there are several other requests currently being executed.** This client will later call back `responseCallback` with either an HTTP response or a failure* exception.** @throws IllegalStateException when the call has already been executed.*/fun enqueue(responseCallback: Callback)/** Cancels the request, if possible. Requests that are already complete cannot be canceled. */fun cancel()/*** Returns true if this call has been either [executed][execute] or [enqueued][enqueue]. It is an* error to execute a call more than once.*/fun isExecuted(): Booleanfun isCanceled(): Boolean/*** Returns a timeout that spans the entire call: resolving DNS, connecting, writing the request* body, server processing, and reading the response body. If the call requires redirects or* retries all must complete within one timeout period.** Configure the client's default timeout with [OkHttpClient.Builder.callTimeout].*/fun timeout(): Timeout/*** Create a new, identical call to this one which can be enqueued or executed even if this call* has already been.*/public override fun clone(): Callfun interface Factory {fun newCall(request: Request): Call}
}
RealCall
RealCall是Call接口的具体实现类,是应用端与网络层的连接器,输入应用端原始的请求与连接数据,以及网络层返回的response及其他数据流。通过使用方法可知,创建完RealCall对象后,就要调用同步或异步请求方法,所以它里面还包含同步请求execute()与异步请求enqueue()方法。
class RealCall(val client: OkHttpClient,/** The application's original request unadulterated by redirects or auth headers. */val originalRequest: Request,val forWebSocket: Boolean
) : Call {....override fun execute(): Response {check(executed.compareAndSet(false, true)) { "Already Executed" }timeout.enter()callStart()try {client.dispatcher.executed(this)return getResponseWithInterceptorChain()} finally {client.dispatcher.finished(this)}}override fun enqueue(responseCallback: Callback) {check(executed.compareAndSet(false, true)) { "Already Executed" }callStart()client.dispatcher.enqueue(AsyncCall(responseCallback))}
....
AsyncCall
异步请求调用,是RealCall的一个内部类,本身实现了Runnable接口,被调度器中的线程池所执行。
internal inner class AsyncCall(private val responseCallback: Callback) : Runnable {....
Dispatcher
调度器,用来调度Call对象,同时包含线程池和异步请求队列,用来存放与执行AsyncCall对象。
流程分析
同步请求
val response = client.newCall(request).execute();
newCall方法就是创建一个RealCall对象,然后执行其execute方法
override fun execute(): Response {//CAS判断是否已经被执行了,确保只能执行一次,如果已经被执行了,则抛出异常check(executed.compareAndSet(false, true)) { "Already Executed" }//开启超时监听timeout.enter()//开启请求监听callStart()try {//调用调度器中的executed方法,调度器只是将call加入到runningSyncCalls队列中client.dispatcher.executed(this)//调用getResponseWithInterceptorChain方法拿到responsereturn getResponseWithInterceptorChain()} finally {//执行完毕,调度器将该call从runningSyncCalls队列中移除client.dispatcher.finished(this)}}
调度器调用executed犯法,将当前的RealCall对象加入到runningSyncCalls队列中,然后调用getResponseWithInterceptorChain方法拿到response。
异步请求
client.newCall(request).enqueue(object : Callback {override fun onFailure(call: Call, e: IOException) {e.printStackTrace()}override fun onResponse(call: Call, response: Response) {}
})RealCall.kt
override fun enqueue(responseCallback: Callback) {check(executed.compareAndSet(false, true)) { "Already Executed" }//开启请求监听callStart()//新建一个AsyncCall对象,通过调度器enqueue方法加入到readyAsyncCalls队列中client.dispatcher.enqueue(AsyncCall(responseCallback))
}Dispatcher.kt
internal fun enqueue(call: AsyncCall) {synchronized(this) {readyAsyncCalls.add(call)// Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to// the same host.if (!call.call.forWebSocket) {//通过域名来查找有没有相同域名的请求,有则复用val existingCall = findExistingCallWithHost(call.host)if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)}}//执行请求promoteAndExecute()}private fun promoteAndExecute(): Boolean {this.assertThreadDoesntHoldLock()val executableCalls = mutableListOf<AsyncCall>()val isRunning: Booleansynchronized(this) {//遍历readyAsyncCalls队列val i = readyAsyncCalls.iterator()while (i.hasNext()) {val asyncCall = i.next()//检查runningAsyncCalls的数量if (runningAsyncCalls.size >= this.maxRequests) break // Max capacity.//同域名最大请求数if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue // Host max capacity.i.remove()asyncCall.callsPerHost.incrementAndGet()executableCalls.add(asyncCall)runningAsyncCalls.add(asyncCall)}isRunning = runningCallsCount() > 0}for (i in 0 until executableCalls.size) {val asyncCall = executableCalls[i]asyncCall.executeOn(executorService)}return isRunning}RealCall.kt
fun executeOn(executorService: ExecutorService) {client.dispatcher.assertThreadDoesntHoldLock()var success = falsetry {//执行AsyncCall这个runnableexecutorService.execute(this)success = true} catch (e: RejectedExecutionException) {val ioException = InterruptedIOException("executor rejected")ioException.initCause(e)noMoreExchanges(ioException)responseCallback.onFailure(this@RealCall, ioException)} finally {if (!success) {client.dispatcher.finished(this) // This call is no longer running!}}}override fun run() {threadName("OkHttp ${redactedUrl()}") {var signalledCallback = falsetimeout.enter()try {//获取返回数据val response = getResponseWithInterceptorChain()signalledCallback = trueresponseCallback.onResponse(this@RealCall, response)} catch (e: IOException) {if (signalledCallback) {// Do not signal the callback twice!Platform.get().log("Callback failure for ${toLoggableString()}", Platform.INFO, e)} else {responseCallback.onFailure(this@RealCall, e)}} catch (t: Throwable) {cancel()if (!signalledCallback) {val canceledException = IOException("canceled due to $t")canceledException.addSuppressed(t)responseCallback.onFailure(this@RealCall, canceledException)}throw t} finally {client.dispatcher.finished(this)}}}}
获取Response
@Throws(IOException::class)internal fun getResponseWithInterceptorChain(): Response {// Build a full stack of interceptors.//拦截器val interceptors = mutableListOf<Interceptor>()interceptors += client.interceptorsinterceptors += RetryAndFollowUpInterceptor(client)interceptors += BridgeInterceptor(client.cookieJar)interceptors += CacheInterceptor(client.cache)interceptors += ConnectInterceptorif (!forWebSocket) {interceptors += client.networkInterceptors}interceptors += CallServerInterceptor(forWebSocket)//拦截器责任链val chain = RealInterceptorChain(call = this,interceptors = interceptors,index = 0,exchange = null,request = originalRequest,connectTimeoutMillis = client.connectTimeoutMillis,readTimeoutMillis = client.readTimeoutMillis,writeTimeoutMillis = client.writeTimeoutMillis)var calledNoMoreExchanges = falsetry {//根据拦截器责任链来获取responseval response = chain.proceed(originalRequest)if (isCanceled()) {response.closeQuietly()throw IOException("Canceled")}return response} catch (e: IOException) {calledNoMoreExchanges = truethrow noMoreExchanges(e) as Throwable} finally {if (!calledNoMoreExchanges) {noMoreExchanges(null)}}}RealInterceptorChain.kt@Throws(IOException::class)override fun proceed(request: Request): Response {check(index < interceptors.size)calls++if (exchange != null) {check(exchange.finder.sameHostAndPort(request.url)) {"network interceptor ${interceptors[index - 1]} must retain the same host and port"}check(calls == 1) {"network interceptor ${interceptors[index - 1]} must call proceed() exactly once"}}// Call the next interceptor in the chain.//拷贝下一级责任链val next = copy(index = index + 1, request = request)val interceptor = interceptors[index]@Suppress("USELESS_ELVIS")//执行拦截器拦截方法val response = interceptor.intercept(next) ?: throw NullPointerException("interceptor $interceptor returned null")if (exchange != null) {check(index + 1 >= interceptors.size || next.calls == 1) {"network interceptor $interceptor must call proceed() exactly once"}}check(response.body != null) { "interceptor $interceptor returned a response with no body" }return response}
Interceptor
该接口只声明了一个拦截器方法,同时,在其内部,还有一个Chain接口,核心方法是proceed(request)处理请求来获取response
各类拦截器
client.interceptors:由开发者自定义的拦截器,会在所有拦截器处理之前进行最早的拦截处理,可用于添加一些公共参数,如自定义header、自定义log等等。
class HeadInterceptor : Interceptor {override fun intercept(chain: Interceptor.Chain): Response {val request = chain.request().newBuilder().addHeader("device-serial", "********8").build()return chain.proceed(request)}
}fun test() {val client = OkHttpClient.Builder().connectTimeout(60, TimeUnit.MILLISECONDS).readTimeout(15, TimeUnit.MILLISECONDS).writeTimeout(15, TimeUnit.MILLISECONDS).addInterceptor(HeadInterceptor()).build()
}
RetryAndFollowUpInterceptor:这里会对连接做一些初始化的工作,以及请求失败的重试工作,重定向的后续请求工作。
@Throws(IOException::class)override fun intercept(chain: Interceptor.Chain): Response {val realChain = chain as RealInterceptorChainvar request = chain.requestval call = realChain.callvar followUpCount = 0var priorResponse: Response? = nullvar newExchangeFinder = truevar recoveredFailures = listOf<IOException>()while (true) {//这里创建一个exchangeFinder,ConnectInterceptor会用到call.enterNetworkInterceptorExchange(request, newExchangeFinder)var response: Responsevar closeActiveExchange = truetry {if (call.isCanceled()) {throw IOException("Canceled")}try {response = realChain.proceed(request)newExchangeFinder = true} catch (e: RouteException) {// The attempt to connect via a route failed. The request will not have been sent.//当请求还未完全发送时连接失败,可以尝试检查是否能重新连接if (!recover(e.lastConnectException, call, request, requestSendStarted = false)) {throw e.firstConnectException.withSuppressed(recoveredFailures)} else {recoveredFailures += e.firstConnectException}newExchangeFinder = falsecontinue} catch (e: IOException) {// An attempt to communicate with a server failed. The request may have been sent.if (!recover(e, call, request, requestSendStarted = e !is ConnectionShutdownException)) {throw e.withSuppressed(recoveredFailures)} else {recoveredFailures += e}newExchangeFinder = falsecontinue}// Attach the prior response if it exists. Such responses never have a body.//尝试使用上一个已经构建好的response,body为空if (priorResponse != null) {response = response.newBuilder().priorResponse(priorResponse.newBuilder().body(null).build()).build()}val exchange = call.interceptorScopedExchange//根据responseCode来判断,是否构建一个新的response并返回来重试或者重定向val followUp = followUpRequest(response, exchange)if (followUp == null) {if (exchange != null && exchange.isDuplex) {call.timeoutEarlyExit()}closeActiveExchange = falsereturn response}val followUpBody = followUp.bodyif (followUpBody != null && followUpBody.isOneShot()) {closeActiveExchange = falsereturn response}response.body?.closeQuietly()if (++followUpCount > MAX_FOLLOW_UPS) {throw ProtocolException("Too many follow-up requests: $followUpCount")}request = followUppriorResponse = response} finally {call.exitNetworkInterceptorExchange(closeActiveExchange)}}}/*** Report and attempt to recover from a failure to communicate with a server. Returns true if* `e` is recoverable, or false if the failure is permanent. Requests with a body can only* be recovered if the body is buffered or if the failure occurred before the request has been* sent.*///判断是否需要重连private fun recover(e: IOException,call: RealCall,userRequest: Request,requestSendStarted: Boolean): Boolean {// The application layer has forbidden retries.//客户端禁止重试if (!client.retryOnConnectionFailure) return false//不能再次发送该请求体// We can't send the request body again.if (requestSendStarted && requestIsOneShot(e, userRequest)) return false//发生的异常时致命的,无法恢复// This exception is fatal.if (!isRecoverable(e, requestSendStarted)) return false//没有跟多的路由来尝试重连// No more routes to attempt.if (!call.retryAfterFailure()) return false// For failure recovery, use the same route selector with a new connection.return true}
BridgeInterceptor:这是客户端与服务器之间的沟通桥梁,负责将用户构建的请求转换为服务器需要的请求,以及将网络请求返回回来的响应转换为用户可用的相应
@Throws(IOException::class)override fun intercept(chain: Interceptor.Chain): Response {val userRequest = chain.request()val requestBuilder = userRequest.newBuilder()val body = userRequest.bodyif (body != null) {val contentType = body.contentType()if (contentType != null) {requestBuilder.header("Content-Type", contentType.toString())}val contentLength = body.contentLength()if (contentLength != -1L) {requestBuilder.header("Content-Length", contentLength.toString())requestBuilder.removeHeader("Transfer-Encoding")} else {requestBuilder.header("Transfer-Encoding", "chunked")requestBuilder.removeHeader("Content-Length")}}if (userRequest.header("Host") == null) {requestBuilder.header("Host", userRequest.url.toHostHeader())}if (userRequest.header("Connection") == null) {requestBuilder.header("Connection", "Keep-Alive")}// If we add an "Accept-Encoding: gzip" header field we're responsible for also decompressing// the transfer stream.var transparentGzip = falseif (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {transparentGzip = truerequestBuilder.header("Accept-Encoding", "gzip")}val cookies = cookieJar.loadForRequest(userRequest.url)if (cookies.isNotEmpty()) {requestBuilder.header("Cookie", cookieHeader(cookies))}if (userRequest.header("User-Agent") == null) {requestBuilder.header("User-Agent", userAgent)}//执行下一个拦截器val networkResponse = chain.proceed(requestBuilder.build())cookieJar.receiveHeaders(userRequest.url, networkResponse.headers)val responseBuilder = networkResponse.newBuilder().request(userRequest)if (transparentGzip &&"gzip".equals(networkResponse.header("Content-Encoding"), ignoreCase = true) &&networkResponse.promisesBody()) {val responseBody = networkResponse.bodyif (responseBody != null) {val gzipSource = GzipSource(responseBody.source())val strippedHeaders = networkResponse.headers.newBuilder().removeAll("Content-Encoding").removeAll("Content-Length").build()responseBuilder.headers(strippedHeaders)val contentType = networkResponse.header("Content-Type")responseBuilder.body(RealResponseBody(contentType, -1L, gzipSource.buffer()))}}return responseBuilder.build()}
CacheInterceptor:这里是缓存相关的处理,会根据用户在OkhttpClient里定义的缓存配置,然后结合请求新建一个缓存策略,由他来判断使用网络还是缓存来构建response
@Throws(IOException::class)override fun intercept(chain: Interceptor.Chain): Response {val call = chain.call()val cacheCandidate = cache?.get(chain.request())val now = System.currentTimeMillis()//创建一个缓存策略val strategy = CacheStrategy.Factory(now, chain.request(), cacheCandidate).compute()//使用网络val networkRequest = strategy.networkRequest//使用缓存val cacheResponse = strategy.cacheResponse//追踪网络与缓存的使用情况cache?.trackResponse(strategy)val listener = (call as? RealCall)?.eventListener ?: EventListener.NONE//关闭不适用的缓存数据if (cacheCandidate != null && cacheResponse == null) {// The cache candidate wasn't applicable. Close it.cacheCandidate.body?.closeQuietly()}//如果网络被禁止,同时缓存也是空的,构建一个code为504的response返回// If we're forbidden from using the network and the cache is insufficient, fail.if (networkRequest == null && cacheResponse == null) {return Response.Builder().request(chain.request()).protocol(Protocol.HTTP_1_1).code(HTTP_GATEWAY_TIMEOUT).message("Unsatisfiable Request (only-if-cached)").body(EMPTY_RESPONSE).sentRequestAtMillis(-1L).receivedResponseAtMillis(System.currentTimeMillis()).build().also {listener.satisfactionFailure(call, it)}}//没有网络,由缓存数据,则构建缓存内容并返回// If we don't need the network, we're done.if (networkRequest == null) {return cacheResponse!!.newBuilder().cacheResponse(stripBody(cacheResponse)).build().also {listener.cacheHit(call, it)}}if (cacheResponse != null) {listener.cacheConditionalHit(call, cacheResponse)} else if (cache != null) {listener.cacheMiss(call)}var networkResponse: Response? = nulltry {networkResponse = chain.proceed(networkRequest)} finally {// If we're crashing on I/O or otherwise, don't leak the cache body.if (networkResponse == null && cacheCandidate != null) {cacheCandidate.body?.closeQuietly()}}// If we have a cache response too, then we're doing a conditional get.if (cacheResponse != null) {//如果由缓存,且返回的code为304时,使用缓存构建数据并返回if (networkResponse?.code == HTTP_NOT_MODIFIED) {val response = cacheResponse.newBuilder().headers(combine(cacheResponse.headers, networkResponse.headers)).sentRequestAtMillis(networkResponse.sentRequestAtMillis).receivedResponseAtMillis(networkResponse.receivedResponseAtMillis).cacheResponse(stripBody(cacheResponse)).networkResponse(stripBody(networkResponse)).build()networkResponse.body!!.close()// Update the cache after combining headers but before stripping the// Content-Encoding header (as performed by initContentStream()).cache!!.trackConditionalCacheHit()cache.update(cacheResponse, response)return response.also {listener.cacheHit(call, it)}} else {cacheResponse.body?.closeQuietly()}}val response = networkResponse!!.newBuilder().cacheResponse(stripBody(cacheResponse)).networkResponse(stripBody(networkResponse)).build()if (cache != null) {if (response.promisesBody() && CacheStrategy.isCacheable(response, networkRequest)) {// Offer this request to the cache.val cacheRequest = cache.put(response)return cacheWritingResponse(cacheRequest, response).also {if (cacheResponse != null) {// This will log a conditional cache miss only.listener.cacheMiss(call)}}}//根据请求方法来判定缓存是否有效,只对Get请求进行缓存if (HttpMethod.invalidatesCache(networkRequest.method)) {try {cache.remove(networkRequest)} catch (_: IOException) {// The cache cannot be written.}}}return response}
ConnectInterceptor:这里主要是负责建立连接,会建立tcp或者tls连接
@Throws(IOException::class)override fun intercept(chain: Interceptor.Chain): Response {val realChain = chain as RealInterceptorChainval exchange = realChain.call.initExchange(chain)val connectedChain = realChain.copy(exchange = exchange)return connectedChain.proceed(realChain.request)
}
client.networkInterceptors:这也是开发者自己设置的,但所处位置不同,用处也不相同
CallServerInterceptor:这里是进行网络请求和响应的地方。将请求头与请求体发送给服务器,以及解析服务器返回的response。
以下是整体调用流程
知识点:
建造者模式:不论是OkHttpClient、Request还是Response,都是用了建造者模式,因为这几个类中都有很多参数,需要供用户选择需要的参数来构建其想要的实例。
工厂方法模式:帮助生成复杂对象,比如OkHttpClient.newCall(request Request)来创建call对象
责任链模式:这里将拦截器构建成拦截器责任链,然后按顺序从上往下执行,得到response后,从下往上传。
线程安全
使用原子类,重入锁等实现线程安全
数据结构
采用队列实现,更符合整体网络请求的要求,比如先到先得。