@Overridepublic Response intercept(Chain chain)throws IOException { RealInterceptorChainrealChain= (RealInterceptorChain) chain; Requestrequest= realChain.request(); Transmittertransmitter= realChain.transmitter(); // We need the network to satisfy this request. Possibly for validating a conditional GET. booleandoExtensiveHealthChecks= !request.method().equals("GET"); Exchangeexchange= transmitter.newExchange(chain, doExtensiveHealthChecks); return realChain.proceed(request, transmitter, exchange); }
Exchange可以传输HTTP请求和响应,并管理连接和事件。 newExchange方法调用:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
/** Returns a new exchange to carry a new request and response. */ Exchange newExchange(Interceptor.Chain chain, boolean doExtensiveHealthChecks) { synchronized (connectionPool) { if (noMoreExchanges) { thrownewIllegalStateException("released"); } if (exchange != null) { thrownewIllegalStateException("cannot make a new request because the previous response " + "is still open: please call response.close()"); } } ExchangeCodeccodec= exchangeFinder.find(client, chain, doExtensiveHealthChecks); Exchangeresult=newExchange(this, call, eventListener, exchangeFinder, codec); ...... } }
/** * Returns a connection to host a new stream. This prefers the existing connection if it exists, * then the pool, finally building a new connection. */ private RealConnection findConnection(int connectTimeout, int readTimeout, int writeTimeout, int pingIntervalMillis, boolean connectionRetryEnabled)throws IOException { booleanfoundPooledConnection=false; RealConnectionresult=null; RouteselectedRoute=null; RealConnection releasedConnection; Socket toClose; synchronized (connectionPool) { if (transmitter.isCanceled()) thrownewIOException("Canceled"); ......
if (result == null) { //2.根据 Address 从连接池获取连接 // Attempt to get a connection from the pool. if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, null, false)) { foundPooledConnection = true; result = transmitter.connection; } elseif (nextRouteToTry != null) { selectedRoute = nextRouteToTry; nextRouteToTry = null; } elseif (retryCurrentRoute()) { selectedRoute = transmitter.connection.route(); } } } ...... // 3. 重新选择路由 // If we need a route selection, make one. This is a blocking operation. booleannewRouteSelection=false; if (selectedRoute == null && (routeSelection == null || !routeSelection.hasNext())) { newRouteSelection = true; routeSelection = routeSelector.next(); }
List<Route> routes = null; synchronized (connectionPool) { if (transmitter.isCanceled()) thrownewIOException("Canceled");
if (newRouteSelection) { // Now that we have a set of IP addresses, make another attempt at getting a connection from // the pool. This could match due to connection coalescing. routes = routeSelection.getAll(); if (connectionPool.transmitterAcquirePooledConnection( address, transmitter, routes, false)) { foundPooledConnection = true; result = transmitter.connection; } }
if (!foundPooledConnection) { if (selectedRoute == null) { selectedRoute = routeSelection.next(); } // 3. 重新选择路由,创建新的 `RealConnection` // Create a connection and assign it to this allocation immediately. This makes it possible // for an asynchronous cancel() to interrupt the handshake we're about to do. result = newRealConnection(connectionPool, selectedRoute); connectingConnection = result; } }
...... // 4. 进行 Socket 连接 // Do TCP + TLS handshakes. This is a blocking operation. result.connect(connectTimeout, readTimeout, writeTimeout, pingIntervalMillis, connectionRetryEnabled, call, eventListener); connectionPool.routeDatabase.connected(result.route());
Socketsocket=null; synchronized (connectionPool) { connectingConnection = null; // Last attempt at connection coalescing, which only occurs if we attempted multiple // concurrent connections to the same host. if (connectionPool.transmitterAcquirePooledConnection(address, transmitter, routes, true)) { // We lost the race! Close the connection we created and return the pooled connection. result.noNewExchanges = true; socket = result.socket(); result = transmitter.connection; } else { //把连接放入连接池中 connectionPool.put(result); transmitter.acquireConnectionNoEvents(result); } } ...... return result; }
/** Reads headers or trailers. */ private Headers readHeaders()throws IOException { Headers.Builderheaders=newHeaders.Builder(); // parse the result headers until the first blank line for (String line; (line = readHeaderLine()).length() != 0; ) { Internal.instance.addLenient(headers, line); } return headers.build(); }
response body
解析 response body 内容:
1 2 3 4 5 6 7 8 9 10
if (forWebSocket && code == 101) { // Connection is upgrading, but we need to ensure interceptors see a non-null response body. response = response.newBuilder() .body(Util.EMPTY_RESPONSE) .build(); } else { response = response.newBuilder() .body(exchange.openResponseBody(response)) .build(); }