序
本文主要研究一下sentinel的NettyHttpCommandCenter
NettyHttpCommandCenter
com/alibaba/csp/sentinel/transport/command/NettyHttpCommandCenter.java
public class NettyHttpCommandCenter implements CommandCenter { private final HttpServer server = new HttpServer(); private final ExecutorService pool = Executors.newSingleThreadExecutor( new NamedThreadFactory("sentinel-netty-command-center-executor")); @Override public void start() throws Exception { pool.submit(new Runnable() { @Override public void run() { try { server.start(); } catch (Exception ex) { RecordLog.info("Start netty server error", ex); ex.printStackTrace(); System.exit(-1); } } }); } @Override public void stop() throws Exception { server.close(); pool.shutdownNow(); } @Override public void beforeStart() throws Exception { // Register handlers Map handlers = CommandHandlerProvider.getInstance().namedHandlers(); server.registerCommands(handlers); }}
- 这里特意newSingleThreadExecutor,用来异步启动HttpServer
- 在启动之前调用CommandHandlerProvider.getInstance().namedHandlers(),触发收集命令及handler
- 然后调用server.registerCommands(handlers)来注册这些handler
HttpServer
com/alibaba/csp/sentinel/transport/command/netty/HttpServer.java
public final class HttpServer { private static final int DEFAULT_PORT = 8719; private Channel channel; final static Map handlerMap = new ConcurrentHashMap (); public void start() throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new HttpServerInitializer()); int port; try { if (StringUtil.isEmpty(TransportConfig.getPort())) { CommandCenterLog.info("Port not configured, using default port: " + DEFAULT_PORT); port = DEFAULT_PORT; } else { port = Integer.parseInt(TransportConfig.getPort()); } } catch (Exception e) { throw new IllegalArgumentException("Illegal port: " + TransportConfig.getPort()); } channel = b.bind(port).sync().channel(); channel.closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } public void close() { channel.close(); } public void registerCommand(String commandName, CommandHandler handler) { if (StringUtil.isEmpty(commandName) || handler == null) { return; } if (handlerMap.containsKey(commandName)) { CommandCenterLog.info("Register failed (duplicate command): " + commandName); return; } handlerMap.put(commandName, handler); } public void registerCommands(Map handlerMap) { if (handlerMap != null) { for (Entry e : handlerMap.entrySet()) { registerCommand(e.getKey(), e.getValue()); } } }}
- 这里使用netty构造nio server,处理handler为HttpServerInitializer
HttpServerInitializer
com/alibaba/csp/sentinel/transport/command/netty/HttpServerInitializer.java
public class HttpServerInitializer extends ChannelInitializer { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline p = socketChannel.pipeline(); p.addLast(new HttpRequestDecoder()); p.addLast(new HttpObjectAggregator(1024 * 1024)); p.addLast(new HttpResponseEncoder()); p.addLast(new HttpServerHandler()); }}
- 设置了HttpRequestDecoder、HttpObjectAggregator、HttpResponseEncoder、HttpServerHandler,前三个为netty自带的组件
HttpServerHandler
com/alibaba/csp/sentinel/transport/command/netty/HttpServerHandler.java
public class HttpServerHandler extends SimpleChannelInboundHandler
- 继承SimpleChannelInboundHandler
- channelRead0方法主要是解析CommandRequest,然后获取响应commandName,调用对应的commandHandler,最后写入结果
小结
NettyHttpCommandCenter提供的是基于netty的http实现,sentinel-transport还有一个SimpleHttpCommandCenter,是基于java socket的bio外加工作线程池模式的实现。
doc