JIoEndpoint的内部类Acceptor,Acceptor实现了Runnable接口。Acceptor作为后台线程不断循环,每次循环都会接收来自浏览器的Socket连接
最后将Socket交给外部类JIoEndpoint的processSocket方法处理。
交给worker线程去完成的,这其中使用了线程池的技术。
protected boolean processSocket(Socket socket) {
// Process the request from this socket
try {
SocketWrapper<Socket> wrapper = new SocketWrapper<Socket>(socket);
wrapper.setKeepAliveLeft(getMaxKeepAliveRequests());
wrapper.setSecure(isSSLEnabled());
// During shutdown, executor may be null - avoid NPE
if (!running) {
return false;
}
//交给worker线程去完成的,这其中使用了线程池的技术。
getExecutor().execute(new SocketProcessor(wrapper));
} catch (RejectedExecutionException x) {
log.warn("Socket processing request was rejected for:"+socket,x);
return false;
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
// This means we got an OOM or similar creating a thread, or that
// the pool and its queue are full
log.error(sm.getString("endpoint.process.fail"), t);
return false;
}
return true;
}
将Socket封装为SocketWrapper;
给SocketWrapper设置连接保持时间keepAliveLeft;
创建SocketProcessor(此类也是JIoEndpoint的内部类,而且也实现了Runnable接口),并使用线程池执行。
protected class SocketProcessor implements Runnable {
protected SocketWrapper<Socket> socket = null;
protected SocketStatus status = null;
public SocketProcessor(SocketWrapper<Socket> socket) {
if (socket==null) throw new NullPointerException();
this.socket = socket;
}
public SocketProcessor(SocketWrapper<Socket> socket, SocketStatus status) {
this(socket);
this.status = status;
}
@Override
public void run() {
boolean launch = false;
synchronized (socket) {
try {
SocketState state = SocketState.OPEN;
try {
// SSL handshake
serverSocketFactory.handshake(socket.getSocket());
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
if (log.isDebugEnabled()) {
log.debug(sm.getString("endpoint.err.handshake"), t);
}
// Tell to close the socket
state = SocketState.CLOSED;
}
if ((state != SocketState.CLOSED)) {
if (status == null) {
state = handler.process(socket, SocketStatus.OPEN_READ);
} else {
state = handler.process(socket,status);
}
}
if (state == SocketState.CLOSED) {
// Close socket
if (log.isTraceEnabled()) {
log.trace("Closing socket:"+socket);
}
countDownConnection();
try {
socket.getSocket().close();
} catch (IOException e) {
// Ignore
}
} else if (state == SocketState.OPEN ||
state == SocketState.UPGRADING ||
state == SocketState.UPGRADING_TOMCAT ||
state == SocketState.UPGRADED){
socket.setKeptAlive(true);
socket.access();
launch = true;
} else if (state == SocketState.LONG) {
socket.access();
waitingRequests.add(socket);
}
} finally {
if (launch) {
try {
getExecutor().execute(new SocketProcessor(socket, SocketStatus.OPEN_READ));
} catch (RejectedExecutionException x) {
log.warn("Socket reprocessing request was rejected for:"+socket,x);
try {
//unable to handle connection at this time
handler.process(socket, SocketStatus.DISCONNECT);
} finally {
countDownConnection();
}
} catch (NullPointerException npe) {
if (running) {
log.error(sm.getString("endpoint.launch.fail"),
npe);
}
}
}
}
}
socket = null;
// Finish up this request
}
}
handler的process方法处理请求
public SocketState process(SocketWrapper<S> wrapper,
SocketStatus status) {
if (wrapper == null) {
// Nothing to do. Socket has been closed.
return SocketState.CLOSED;
}
S socket = wrapper.getSocket();
if (socket == null) {
// Nothing to do. Socket has been closed.
return SocketState.CLOSED;
}
Processor<S> processor = connections.get(socket);
if (status == SocketStatus.DISCONNECT && processor == null) {
// Nothing to do. Endpoint requested a close and there is no
// longer a processor associated with this socket.
return SocketState.CLOSED;
}
wrapper.setAsync(false);
ContainerThreadMarker.markAsContainerThread();
try {
if (processor == null) {
processor = recycledProcessors.poll();
}
if (processor == null) {
processor = createProcessor();
}
initSsl(wrapper, processor);
SocketState state = SocketState.CLOSED;
do {
if (status == SocketStatus.CLOSE_NOW) {
processor.errorDispatch();
state = SocketState.CLOSED;
} else if (status == SocketStatus.DISCONNECT &&
!processor.isComet()) {
// Do nothing here, just wait for it to get recycled
// Don't do this for Comet we need to generate an end
// event (see BZ 54022)
} else if (processor.isAsync() ||
state == SocketState.ASYNC_END) {
state = processor.asyncDispatch(status);
} else if (processor.isComet()) {
state = processor.event(status);
} else if (processor.getUpgradeInbound() != null) {
state = processor.upgradeDispatch();
} else if (processor.isUpgrade()) {
state = processor.upgradeDispatch(status);
} else {
state = processor.process(wrapper);
}
if (state != SocketState.CLOSED && processor.isAsync()) {
state = processor.asyncPostProcess();
}
if (state == SocketState.UPGRADING) {
// Get the HTTP upgrade handler
HttpUpgradeHandler httpUpgradeHandler =
processor.getHttpUpgradeHandler();
// Release the Http11 processor to be re-used
release(wrapper, processor, false, false);
// Create the upgrade processor
processor = createUpgradeProcessor(
wrapper, httpUpgradeHandler);
// Mark the connection as upgraded
wrapper.setUpgraded(true);
// Associate with the processor with the connection
connections.put(socket, processor);
// Initialise the upgrade handler (which may trigger
// some IO using the new protocol which is why the lines
// above are necessary)
// This cast should be safe. If it fails the error
// handling for the surrounding try/catch will deal with
// it.
httpUpgradeHandler.init((WebConnection) processor);
} else if (state == SocketState.UPGRADING_TOMCAT) {
// Get the UpgradeInbound handler
org.apache.coyote.http11.upgrade.UpgradeInbound inbound =
processor.getUpgradeInbound();
// Release the Http11 processor to be re-used
release(wrapper, processor, false, false);
// Create the light-weight upgrade processor
processor = createUpgradeProcessor(wrapper, inbound);
inbound.onUpgradeComplete();
}
if (getLog().isDebugEnabled()) {
getLog().debug("Socket: [" + wrapper +
"], Status in: [" + status +
"], State out: [" + state + "]");
}
} while (state == SocketState.ASYNC_END ||
state == SocketState.UPGRADING ||
state == SocketState.UPGRADING_TOMCAT);
if (state == SocketState.LONG) {
// In the middle of processing a request/response. Keep the
// socket associated with the processor. Exact requirements
// depend on type of long poll
connections.put(socket, processor);
longPoll(wrapper, processor);
} else if (state == SocketState.OPEN) {
// In keep-alive but between requests. OK to recycle
// processor. Continue to poll for the next request.
connections.remove(socket);
release(wrapper, processor, false, true);
} else if (state == SocketState.SENDFILE) {
// Sendfile in progress. If it fails, the socket will be
// closed. If it works, the socket will be re-added to the
// poller
connections.remove(socket);
release(wrapper, processor, false, false);
} else if (state == SocketState.UPGRADED) {
// Need to keep the connection associated with the processor
connections.put(socket, processor);
// Don't add sockets back to the poller if this was a
// non-blocking write otherwise the poller may trigger
// multiple read events which may lead to thread starvation
// in the connector. The write() method will add this socket
// to the poller if necessary.
if (status != SocketStatus.OPEN_WRITE) {
longPoll(wrapper, processor);
}
} else {
// Connection closed. OK to recycle the processor. Upgrade
// processors are not recycled.
connections.remove(socket);
if (processor.isUpgrade()) {
processor.getHttpUpgradeHandler().destroy();
} else if (processor instanceof org.apache.coyote.http11.upgrade.UpgradeProcessor) {
// NO-OP
} else {
release(wrapper, processor, true, false);
}
}
return state;
} catch(java.net.SocketException e) {
// SocketExceptions are normal
getLog().debug(sm.getString(
"abstractConnectionHandler.socketexception.debug"), e);
} catch (java.io.IOException e) {
// IOExceptions are normal
getLog().debug(sm.getString(
"abstractConnectionHandler.ioexception.debug"), e);
}
// Future developers: if you discover any other
// rare-but-nonfatal exceptions, catch them here, and log as
// above.
catch (Throwable e) {
ExceptionUtils.handleThrowable(e);
// any other exception or error is odd. Here we log it
// with "ERROR" level, so it will show up even on
// less-than-verbose logs.
getLog().error(
sm.getString("abstractConnectionHandler.error"), e);
}
// Make sure socket/processor is removed from the list of current
// connections
connections.remove(socket);
// Don't try to add upgrade processors back into the pool
if (!(processor instanceof org.apache.coyote.http11.upgrade.UpgradeProcessor)
&& !processor.isUpgrade()) {
release(wrapper, processor, true, false);
}
return SocketState.CLOSED;
}
最后的请求处理交给了CoyoteAdapter,CoyoteAdapter的service方法用于真正处理请求。
public void service(org.apache.coyote.Request req,
org.apache.coyote.Response res)
throws Exception {
Request request = (Request) req.getNote(ADAPTER_NOTES);
Response response = (Response) res.getNote(ADAPTER_NOTES);
if (request == null) {
// Create objects
request = connector.createRequest();
request.setCoyoteRequest(req);
response = connector.createResponse();
response.setCoyoteResponse(res);
// Link objects
request.setResponse(response);
response.setRequest(request);
// Set as notes
req.setNote(ADAPTER_NOTES, request);
res.setNote(ADAPTER_NOTES, response);
// Set query string encoding
req.getParameters().setQueryStringEncoding
(connector.getURIEncoding());
}
if (connector.getXpoweredBy()) {
response.addHeader("X-Powered-By", POWERED_BY);
}
boolean comet = false;
boolean async = false;
try {
// Parse and set Catalina and configuration specific
// request parameters
req.getRequestProcessor().setWorkerThreadName(Thread.currentThread().getName());
boolean postParseSuccess = postParseRequest(req, request, res, response);
if (postParseSuccess) {
//check valves if we support async
request.setAsyncSupported(connector.getService().getContainer().getPipeline().isAsyncSupported());
// Calling the container
connector.getService().getContainer().getPipeline().getFirst().invoke(request, response);
if (request.isComet()) {
if (!response.isClosed() && !response.isError()) {
if (request.getAvailable() || (request.getContentLength() > 0 && (!request.isParametersParsed()))) {
// Invoke a read event right away if there are available bytes
if (event(req, res, SocketStatus.OPEN_READ)) {
comet = true;
res.action(ActionCode.COMET_BEGIN, null);
}
} else {
comet = true;
res.action(ActionCode.COMET_BEGIN, null);
}
} else {
// Clear the filter chain, as otherwise it will not be reset elsewhere
// since this is a Comet request
request.setFilterChain(null);
}
}
}
AsyncContextImpl asyncConImpl = (AsyncContextImpl)request.getAsyncContext();
if (asyncConImpl != null) {
async = true;
} else if (!comet) {
request.finishRequest();
response.finishResponse();
if (postParseSuccess &&
request.getMappingData().context != null) {
// Log only if processing was invoked.
// If postParseRequest() failed, it has already logged it.
// If context is null this was the start of a comet request
// that failed and has already been logged.
((Context) request.getMappingData().context).logAccess(
request, response,
System.currentTimeMillis() - req.getStartTime(),
false);
}
req.action(ActionCode.POST_REQUEST , null);
}
} catch (IOException e) {
// Ignore
} finally {
req.getRequestProcessor().setWorkerThreadName(null);
AtomicBoolean error = new AtomicBoolean(false);
res.action(ActionCode.IS_ERROR, error);
// Recycle the wrapper request and response
if (!comet && !async || error.get()) {
request.recycle();
response.recycle();
} else {
// Clear converters so that the minimum amount of memory
// is used by this processor
request.clearEncoders();
response.clearEncoders();
}
}
}
CoyoteAdapter的service方法最后会将请求交给Engine的Pipeline去处理
代码:connector.getService().getContainer().getPipeline().getFirst().invoke(request, response);