tomcat源码线程池相关

JIoEndpoint的内部类Acceptor,Acceptor实现了Runnable接口。Acceptor作为后台线程不断循环,每次循环都会接收来自浏览器的Socket连接
最后将Socket交给外部类JIoEndpoint的processSocket方法处理。

 

交给worker线程去完成的,这其中使用了线程池的技术。

protected boolean processSocket(Socket socket) {
	// Process the request from this socket
	try {
		SocketWrapper<Socket> wrapper = new SocketWrapper<Socket>(socket);
		wrapper.setKeepAliveLeft(getMaxKeepAliveRequests());
		wrapper.setSecure(isSSLEnabled());
		// During shutdown, executor may be null - avoid NPE
		if (!running) {
			return false;
		}
                //交给worker线程去完成的,这其中使用了线程池的技术。
		getExecutor().execute(new SocketProcessor(wrapper));
	} catch (RejectedExecutionException x) {
		log.warn("Socket processing request was rejected for:"+socket,x);
		return false;
	} catch (Throwable t) {
		ExceptionUtils.handleThrowable(t);
		// This means we got an OOM or similar creating a thread, or that
		// the pool and its queue are full
		log.error(sm.getString("endpoint.process.fail"), t);
		return false;
	}
	return true;
}

将Socket封装为SocketWrapper;
给SocketWrapper设置连接保持时间keepAliveLeft;
创建SocketProcessor(此类也是JIoEndpoint的内部类,而且也实现了Runnable接口),并使用线程池执行。

 

protected class SocketProcessor implements Runnable {

	protected SocketWrapper<Socket> socket = null;
	protected SocketStatus status = null;

	public SocketProcessor(SocketWrapper<Socket> socket) {
		if (socket==null) throw new NullPointerException();
		this.socket = socket;
	}

	public SocketProcessor(SocketWrapper<Socket> socket, SocketStatus status) {
		this(socket);
		this.status = status;
	}

	@Override
	public void run() {
		boolean launch = false;
		synchronized (socket) {
			try {
				SocketState state = SocketState.OPEN;

				try {
					// SSL handshake
					serverSocketFactory.handshake(socket.getSocket());
				} catch (Throwable t) {
					ExceptionUtils.handleThrowable(t);
					if (log.isDebugEnabled()) {
						log.debug(sm.getString("endpoint.err.handshake"), t);
					}
					// Tell to close the socket
					state = SocketState.CLOSED;
				}

				if ((state != SocketState.CLOSED)) {
					if (status == null) {
						state = handler.process(socket, SocketStatus.OPEN_READ);
					} else {
						state = handler.process(socket,status);
					}
				}
				if (state == SocketState.CLOSED) {
					// Close socket
					if (log.isTraceEnabled()) {
						log.trace("Closing socket:"+socket);
					}
					countDownConnection();
					try {
						socket.getSocket().close();
					} catch (IOException e) {
						// Ignore
					}
				} else if (state == SocketState.OPEN ||
						state == SocketState.UPGRADING ||
						state == SocketState.UPGRADING_TOMCAT  ||
						state == SocketState.UPGRADED){
					socket.setKeptAlive(true);
					socket.access();
					launch = true;
				} else if (state == SocketState.LONG) {
					socket.access();
					waitingRequests.add(socket);
				}
			} finally {
				if (launch) {
					try {
						getExecutor().execute(new SocketProcessor(socket, SocketStatus.OPEN_READ));
					} catch (RejectedExecutionException x) {
						log.warn("Socket reprocessing request was rejected for:"+socket,x);
						try {
							//unable to handle connection at this time
							handler.process(socket, SocketStatus.DISCONNECT);
						} finally {
							countDownConnection();
						}


					} catch (NullPointerException npe) {
						if (running) {
							log.error(sm.getString("endpoint.launch.fail"),
									npe);
						}
					}
				}
			}
		}
		socket = null;
		// Finish up this request
	}

}

handler的process方法处理请求

public SocketState process(SocketWrapper<S> wrapper,
			SocketStatus status) {
		if (wrapper == null) {
			// Nothing to do. Socket has been closed.
			return SocketState.CLOSED;
		}

		S socket = wrapper.getSocket();
		if (socket == null) {
			// Nothing to do. Socket has been closed.
			return SocketState.CLOSED;
		}

		Processor<S> processor = connections.get(socket);
		if (status == SocketStatus.DISCONNECT && processor == null) {
			// Nothing to do. Endpoint requested a close and there is no
			// longer a processor associated with this socket.
			return SocketState.CLOSED;
		}

		wrapper.setAsync(false);
		ContainerThreadMarker.markAsContainerThread();

		try {
			if (processor == null) {
				processor = recycledProcessors.poll();
			}
			if (processor == null) {
				processor = createProcessor();
			}

			initSsl(wrapper, processor);

			SocketState state = SocketState.CLOSED;
			do {
				if (status == SocketStatus.CLOSE_NOW) {
					processor.errorDispatch();
					state = SocketState.CLOSED;
				} else if (status == SocketStatus.DISCONNECT &&
						!processor.isComet()) {
					// Do nothing here, just wait for it to get recycled
					// Don't do this for Comet we need to generate an end
					// event (see BZ 54022)
				} else if (processor.isAsync() ||
						state == SocketState.ASYNC_END) {
					state = processor.asyncDispatch(status);
				} else if (processor.isComet()) {
					state = processor.event(status);
				} else if (processor.getUpgradeInbound() != null) {
					state = processor.upgradeDispatch();
				} else if (processor.isUpgrade()) {
					state = processor.upgradeDispatch(status);
				} else {
					state = processor.process(wrapper);
				}

				if (state != SocketState.CLOSED && processor.isAsync()) {
					state = processor.asyncPostProcess();
				}

				if (state == SocketState.UPGRADING) {
					// Get the HTTP upgrade handler
					HttpUpgradeHandler httpUpgradeHandler =
							processor.getHttpUpgradeHandler();
					// Release the Http11 processor to be re-used
					release(wrapper, processor, false, false);
					// Create the upgrade processor
					processor = createUpgradeProcessor(
							wrapper, httpUpgradeHandler);
					// Mark the connection as upgraded
					wrapper.setUpgraded(true);
					// Associate with the processor with the connection
					connections.put(socket, processor);
					// Initialise the upgrade handler (which may trigger
					// some IO using the new protocol which is why the lines
					// above are necessary)
					// This cast should be safe. If it fails the error
					// handling for the surrounding try/catch will deal with
					// it.
					httpUpgradeHandler.init((WebConnection) processor);
				} else if (state == SocketState.UPGRADING_TOMCAT) {
					// Get the UpgradeInbound handler
					org.apache.coyote.http11.upgrade.UpgradeInbound inbound =
							processor.getUpgradeInbound();
					// Release the Http11 processor to be re-used
					release(wrapper, processor, false, false);
					// Create the light-weight upgrade processor
					processor = createUpgradeProcessor(wrapper, inbound);
					inbound.onUpgradeComplete();
				}
				if (getLog().isDebugEnabled()) {
					getLog().debug("Socket: [" + wrapper +
							"], Status in: [" + status +
							"], State out: [" + state + "]");
				}
			} while (state == SocketState.ASYNC_END ||
					state == SocketState.UPGRADING ||
					state == SocketState.UPGRADING_TOMCAT);

			if (state == SocketState.LONG) {
				// In the middle of processing a request/response. Keep the
				// socket associated with the processor. Exact requirements
				// depend on type of long poll
				connections.put(socket, processor);
				longPoll(wrapper, processor);
			} else if (state == SocketState.OPEN) {
				// In keep-alive but between requests. OK to recycle
				// processor. Continue to poll for the next request.
				connections.remove(socket);
				release(wrapper, processor, false, true);
			} else if (state == SocketState.SENDFILE) {
				// Sendfile in progress. If it fails, the socket will be
				// closed. If it works, the socket will be re-added to the
				// poller
				connections.remove(socket);
				release(wrapper, processor, false, false);
			} else if (state == SocketState.UPGRADED) {
				// Need to keep the connection associated with the processor
				connections.put(socket, processor);
				// Don't add sockets back to the poller if this was a
				// non-blocking write otherwise the poller may trigger
				// multiple read events which may lead to thread starvation
				// in the connector. The write() method will add this socket
				// to the poller if necessary.
				if (status != SocketStatus.OPEN_WRITE) {
					longPoll(wrapper, processor);
				}
			} else {
				// Connection closed. OK to recycle the processor. Upgrade
				// processors are not recycled.
				connections.remove(socket);
				if (processor.isUpgrade()) {
					processor.getHttpUpgradeHandler().destroy();
				} else if (processor instanceof org.apache.coyote.http11.upgrade.UpgradeProcessor) {
					// NO-OP
				} else {
					release(wrapper, processor, true, false);
				}
			}
			return state;
		} catch(java.net.SocketException e) {
			// SocketExceptions are normal
			getLog().debug(sm.getString(
					"abstractConnectionHandler.socketexception.debug"), e);
		} catch (java.io.IOException e) {
			// IOExceptions are normal
			getLog().debug(sm.getString(
					"abstractConnectionHandler.ioexception.debug"), e);
		}
		// Future developers: if you discover any other
		// rare-but-nonfatal exceptions, catch them here, and log as
		// above.
		catch (Throwable e) {
			ExceptionUtils.handleThrowable(e);
			// any other exception or error is odd. Here we log it
			// with "ERROR" level, so it will show up even on
			// less-than-verbose logs.
			getLog().error(
					sm.getString("abstractConnectionHandler.error"), e);
		}
		// Make sure socket/processor is removed from the list of current
		// connections
		connections.remove(socket);
		// Don't try to add upgrade processors back into the pool
		if (!(processor instanceof org.apache.coyote.http11.upgrade.UpgradeProcessor)
				&& !processor.isUpgrade()) {
			release(wrapper, processor, true, false);
		}
		return SocketState.CLOSED;
	}

最后的请求处理交给了CoyoteAdapter,CoyoteAdapter的service方法用于真正处理请求。

 

public void service(org.apache.coyote.Request req,
					org.apache.coyote.Response res)
	throws Exception {

	Request request = (Request) req.getNote(ADAPTER_NOTES);
	Response response = (Response) res.getNote(ADAPTER_NOTES);

	if (request == null) {

		// Create objects
		request = connector.createRequest();
		request.setCoyoteRequest(req);
		response = connector.createResponse();
		response.setCoyoteResponse(res);

		// Link objects
		request.setResponse(response);
		response.setRequest(request);

		// Set as notes
		req.setNote(ADAPTER_NOTES, request);
		res.setNote(ADAPTER_NOTES, response);

		// Set query string encoding
		req.getParameters().setQueryStringEncoding
			(connector.getURIEncoding());

	}

	if (connector.getXpoweredBy()) {
		response.addHeader("X-Powered-By", POWERED_BY);
	}

	boolean comet = false;
	boolean async = false;

	try {

		// Parse and set Catalina and configuration specific
		// request parameters
		req.getRequestProcessor().setWorkerThreadName(Thread.currentThread().getName());
		boolean postParseSuccess = postParseRequest(req, request, res, response);
		if (postParseSuccess) {
			//check valves if we support async
			request.setAsyncSupported(connector.getService().getContainer().getPipeline().isAsyncSupported());
			// Calling the container
			connector.getService().getContainer().getPipeline().getFirst().invoke(request, response);

			if (request.isComet()) {
				if (!response.isClosed() && !response.isError()) {
					if (request.getAvailable() || (request.getContentLength() > 0 && (!request.isParametersParsed()))) {
						// Invoke a read event right away if there are available bytes
						if (event(req, res, SocketStatus.OPEN_READ)) {
							comet = true;
							res.action(ActionCode.COMET_BEGIN, null);
						}
					} else {
						comet = true;
						res.action(ActionCode.COMET_BEGIN, null);
					}
				} else {
					// Clear the filter chain, as otherwise it will not be reset elsewhere
					// since this is a Comet request
					request.setFilterChain(null);
				}
			}

		}
		AsyncContextImpl asyncConImpl = (AsyncContextImpl)request.getAsyncContext();
		if (asyncConImpl != null) {
			async = true;
		} else if (!comet) {
			request.finishRequest();
			response.finishResponse();
			if (postParseSuccess &&
					request.getMappingData().context != null) {
				// Log only if processing was invoked.
				// If postParseRequest() failed, it has already logged it.
				// If context is null this was the start of a comet request
				// that failed and has already been logged.
				((Context) request.getMappingData().context).logAccess(
						request, response,
						System.currentTimeMillis() - req.getStartTime(),
						false);
			}
			req.action(ActionCode.POST_REQUEST , null);
		}

	} catch (IOException e) {
		// Ignore
	} finally {
		req.getRequestProcessor().setWorkerThreadName(null);
		AtomicBoolean error = new AtomicBoolean(false);
		res.action(ActionCode.IS_ERROR, error);
		// Recycle the wrapper request and response
		if (!comet && !async || error.get()) {
			request.recycle();
			response.recycle();
		} else {
			// Clear converters so that the minimum amount of memory
			// is used by this processor
			request.clearEncoders();
			response.clearEncoders();
		}
	}

}

 

CoyoteAdapter的service方法最后会将请求交给Engine的Pipeline去处理

代码:connector.getService().getContainer().getPipeline().getFirst().invoke(request, response);

发表评论

邮箱地址不会被公开。