博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
聊聊sentinel的NettyHttpCommandCenter
阅读量:6855 次
发布时间:2019-06-26

本文共 10618 字,大约阅读时间需要 35 分钟。

  hot3.png

本文主要研究一下sentinel的NettyHttpCommandCenter

NettyHttpCommandCenter

com/alibaba/csp/sentinel/transport/command/NettyHttpCommandCenter.java

public class NettyHttpCommandCenter implements CommandCenter {    private final HttpServer server = new HttpServer();    private final ExecutorService pool = Executors.newSingleThreadExecutor(        new NamedThreadFactory("sentinel-netty-command-center-executor"));    @Override    public void start() throws Exception {        pool.submit(new Runnable() {            @Override            public void run() {                try {                    server.start();                } catch (Exception ex) {                    RecordLog.info("Start netty server error", ex);                    ex.printStackTrace();                    System.exit(-1);                }            }        });    }    @Override    public void stop() throws Exception {        server.close();        pool.shutdownNow();    }    @Override    public void beforeStart() throws Exception {        // Register handlers        Map
handlers = CommandHandlerProvider.getInstance().namedHandlers(); server.registerCommands(handlers); }}
  • 这里特意newSingleThreadExecutor,用来异步启动HttpServer
  • 在启动之前调用CommandHandlerProvider.getInstance().namedHandlers(),触发收集命令及handler
  • 然后调用server.registerCommands(handlers)来注册这些handler

HttpServer

com/alibaba/csp/sentinel/transport/command/netty/HttpServer.java

public final class HttpServer {    private static final int DEFAULT_PORT = 8719;    private Channel channel;    final static Map
handlerMap = new ConcurrentHashMap
(); public void start() throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new HttpServerInitializer()); int port; try { if (StringUtil.isEmpty(TransportConfig.getPort())) { CommandCenterLog.info("Port not configured, using default port: " + DEFAULT_PORT); port = DEFAULT_PORT; } else { port = Integer.parseInt(TransportConfig.getPort()); } } catch (Exception e) { throw new IllegalArgumentException("Illegal port: " + TransportConfig.getPort()); } channel = b.bind(port).sync().channel(); channel.closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } public void close() { channel.close(); } public void registerCommand(String commandName, CommandHandler handler) { if (StringUtil.isEmpty(commandName) || handler == null) { return; } if (handlerMap.containsKey(commandName)) { CommandCenterLog.info("Register failed (duplicate command): " + commandName); return; } handlerMap.put(commandName, handler); } public void registerCommands(Map
handlerMap) { if (handlerMap != null) { for (Entry
e : handlerMap.entrySet()) { registerCommand(e.getKey(), e.getValue()); } } }}
  • 这里使用netty构造nio server,处理handler为HttpServerInitializer

HttpServerInitializer

com/alibaba/csp/sentinel/transport/command/netty/HttpServerInitializer.java

public class HttpServerInitializer extends ChannelInitializer
{ @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline p = socketChannel.pipeline(); p.addLast(new HttpRequestDecoder()); p.addLast(new HttpObjectAggregator(1024 * 1024)); p.addLast(new HttpResponseEncoder()); p.addLast(new HttpServerHandler()); }}
  • 设置了HttpRequestDecoder、HttpObjectAggregator、HttpResponseEncoder、HttpServerHandler,前三个为netty自带的组件

HttpServerHandler

com/alibaba/csp/sentinel/transport/command/netty/HttpServerHandler.java

public class HttpServerHandler extends SimpleChannelInboundHandler {    private final CodecRegistry codecRegistry = new CodecRegistry();    @Override    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {        ctx.flush();    }    @Override    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {        FullHttpRequest httpRequest = (FullHttpRequest)msg;        try {            CommandRequest request = parseRequest(httpRequest);            if (StringUtil.isBlank(HttpCommandUtils.getTarget(request))) {                writeErrorResponse(BAD_REQUEST.code(), "Invalid command", ctx);                return;            }            handleRequest(request, ctx, HttpUtil.isKeepAlive(httpRequest));        } catch (Exception ex) {            writeErrorResponse(INTERNAL_SERVER_ERROR.code(), SERVER_ERROR_MESSAGE, ctx);            CommandCenterLog.warn("Internal error", ex);        }    }    private void handleRequest(CommandRequest request, ChannelHandlerContext ctx, boolean keepAlive)        throws Exception {        String commandName = HttpCommandUtils.getTarget(request);        // Find the matching command handler.        CommandHandler
commandHandler = getHandler(commandName); if (commandHandler != null) { CommandResponse
response = commandHandler.handle(request); writeResponse(response, ctx, keepAlive); } else { // No matching command handler. writeErrorResponse(BAD_REQUEST.code(), String.format("Unknown command \"%s\"", commandName), ctx); } } private Encoder
pickEncoder(Class
clazz) { if (clazz == null) { throw new IllegalArgumentException("Bad class metadata"); } for (Encoder
encoder : codecRegistry.getEncoderList()) { if (encoder.canEncode(clazz)) { return encoder; } } return null; } private void writeErrorResponse(int statusCode, String message, ChannelHandlerContext ctx) { FullHttpResponse httpResponse = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.valueOf(statusCode), Unpooled.copiedBuffer(message, Charset.forName(SentinelConfig.charset()))); httpResponse.headers().set("Content-Type", "text/plain; charset=" + SentinelConfig.charset()); ctx.write(httpResponse); ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE); } private void writeResponse(CommandResponse response, ChannelHandlerContext ctx, boolean keepAlive) throws Exception { byte[] body; if (response.isSuccess()) { if (response.getResult() == null) { body = new byte[] {}; } else { Encoder encoder = pickEncoder(response.getResult().getClass()); if (encoder == null) { writeErrorResponse(INTERNAL_SERVER_ERROR.code(), SERVER_ERROR_MESSAGE, ctx); CommandCenterLog.warn("Error when encoding object", new IllegalStateException("No compatible encoder")); return; } body = encoder.encode(response.getResult()); } } else { body = response.getException().getMessage().getBytes(SentinelConfig.charset()); } HttpResponseStatus status = response.isSuccess() ? OK : BAD_REQUEST; FullHttpResponse httpResponse = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status, Unpooled.copiedBuffer(body)); httpResponse.headers().set("Content-Type", "text/plain; charset=" + SentinelConfig.charset()); //if (keepAlive) { // httpResponse.headers().setInt(HttpHeaderNames.CONTENT_LENGTH, httpResponse.content().readableBytes()); // httpResponse.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE); //} //ctx.write(httpResponse); //if (!keepAlive) { // ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE); //} httpResponse.headers().setInt(HttpHeaderNames.CONTENT_LENGTH, httpResponse.content().readableBytes()); httpResponse.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE); ctx.write(httpResponse); ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE); } private CommandRequest parseRequest(FullHttpRequest request) { QueryStringDecoder queryStringDecoder = new QueryStringDecoder(request.uri()); CommandRequest serverRequest = new CommandRequest(); Map
> paramMap = queryStringDecoder.parameters(); // Parse request parameters. if (!paramMap.isEmpty()) { for (Entry
> p : paramMap.entrySet()) { if (!p.getValue().isEmpty()) { serverRequest.addParam(p.getKey(), p.getValue().get(0)); } } } // Parse command name. String target = parseTarget(queryStringDecoder.rawPath()); serverRequest.addMetadata(HttpCommandUtils.REQUEST_TARGET, target); // Parse body. if (request.content().readableBytes() <= 0) { serverRequest.setBody(null); } else { serverRequest.setBody(request.content().array()); } return serverRequest; } private String parseTarget(String uri) { if (StringUtil.isEmpty(uri)) { return ""; } String[] arr = uri.split("/"); if (arr.length < 2) { return ""; } return arr[1]; } private CommandHandler getHandler(String commandName) { if (StringUtil.isEmpty(commandName)) { return null; } return HttpServer.handlerMap.get(commandName); } private void send100Continue(ChannelHandlerContext ctx) { FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE); ctx.write(response); } private static final String SERVER_ERROR_MESSAGE = "Command server error";}
  • 继承SimpleChannelInboundHandler
  • channelRead0方法主要是解析CommandRequest,然后获取响应commandName,调用对应的commandHandler,最后写入结果

小结

NettyHttpCommandCenter提供的是基于netty的http实现,sentinel-transport还有一个SimpleHttpCommandCenter,是基于java socket的bio外加工作线程池模式的实现。

doc

转载于:https://my.oschina.net/go4it/blog/1930792

你可能感兴趣的文章
前沿|CCAI 2017专题论坛:拥抱智能金融新时代
查看>>
2017年我国将开始部署和建设IPv6地址项目
查看>>
Digital workspace 终端用户计算演变的“终结者”
查看>>
快来围观!5款必看的Aruba网络新品
查看>>
成都电信:三大举措护航网络安全
查看>>
物联网技术将深度改变你我生活
查看>>
IDC:2017年全球安全技术支出预计突破817亿美元
查看>>
中国工程院院士:物联网市场须走出碎片化
查看>>
云计算和大数据行业:了解其中真实的谎言
查看>>
香港以大数据打造智慧城市
查看>>
中国电信完成物联网eSIM卡平台建设:力争明年实现eSIM商用
查看>>
OA系统选型分析:华天动力OA与金和OA
查看>>
如果你没被WannaCry感染就一定要小心Adylkuzz
查看>>
HR:2017/2018年数据中心驱动400Gbps部署
查看>>
单元测试覆盖工具coverlipse
查看>>
Jmeter分布式部署文档
查看>>
微软打算用DNA存储数据 但成本和速度仍是个大问题
查看>>
使用Java向properties存数据
查看>>
产能过剩的光伏电池,是否还是未来的朝阳产业?
查看>>
如何在SaaS企业及服务市场上站稳脚跟
查看>>