博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Dubbo性能调优参数及原理
阅读量:4179 次
发布时间:2019-05-26

本文共 18754 字,大约阅读时间需要 62 分钟。

本文是针对 Dubbo 协议调用的调优指导,详细说明常用调优参数的作用域及源码。

Dubbo调用模型

640?wx_fmt=png

常用性能调优参数

参数名 作用范围 默认值 说明 备注
threads provider 200 业务处理线程池大小
iothreads provider CPU+1 io线程池大小
queues provider 0

线程池队列大小,当线程池满时,排队等待执行的队列大小,

建议不要设置,当线程程池时应立即失败,

重试其它服务提供机器,而不是排队,除非有特殊需求

connections consumer 0

对每个提供者的最大连接数,

rmi、http、hessian等短连接协议表示限制连接数,

Dubbo等长连接协表示建立的长连接个数

Dubbo协议默认共享一个长连接
actives consumer 0 每服务消费者每服务每方法最大并发调用数 0表示不限制
acceptes provider 0 服务提供方最大可接受连接数 0表示不限制
executes provider 0 服务提供者每服务每方法最大可并行执行请求数 0表示不限制

 

源码及原理分析

>>  threads

FixedThreadPool.java

 
public Executor getExecutor(URL url) {    String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);    int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);    int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);    return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,             queues == 0 ? new SynchronousQueue
() :                     (queues < 0 ? new LinkedBlockingQueue
() :                             new LinkedBlockingQueue
(queues)),            new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url));}     String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);     int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);     int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);     return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,              queues == 0 ? new SynchronousQueue
() :                      (queues < 0 ? new LinkedBlockingQueue
() :                              new LinkedBlockingQueue
(queues)),             new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url)); }

 

LimitedThreadPool.java

 
public Executor getExecutor(URL url) {    String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);    int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);    int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);    int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);    return new ThreadPoolExecutor(cores, threads, Long.MAX_VALUE, TimeUnit.MILLISECONDS,             queues == 0 ? new SynchronousQueue
() :                     (queues < 0 ? new LinkedBlockingQueue
() :                             new LinkedBlockingQueue
(queues)),            new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url));}     String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);     int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);     int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);     int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);     return new ThreadPoolExecutor(cores, threads, Long.MAX_VALUE, TimeUnit.MILLISECONDS,              queues == 0 ? new SynchronousQueue
() :                      (queues < 0 ? new LinkedBlockingQueue
() :                              new LinkedBlockingQueue
(queues)),             new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url)); }

其中,Constants.DEFAULT_QUEUES = 200。threads 参数配置的是业务处理线程池的最大(或核心)线程数。

 

>>  iothreads

NettyServer.java

 
@Overrideprotected void doOpen() throws Throwable {    NettyHelper.setNettyLoggerFactory();    ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));    ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));    ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));    bootstrap = new ServerBootstrap(channelFactory);    final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);    channels = nettyHandler.getChannels();    bootstrap.setPipelineFactory(new ChannelPipelineFactory() {        public ChannelPipeline getPipeline() {            NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec() ,getUrl(), NettyServer.this);            ChannelPipeline pipeline = Channels.pipeline();            pipeline.addLast("decoder", adapter.getDecoder());            pipeline.addLast("encoder", adapter.getEncoder());            pipeline.addLast("handler", nettyHandler);            return pipeline;        }    });    // bind    channel = bootstrap.bind(getBindAddress());} protected void doOpen() throws Throwable {
    NettyHelper.setNettyLoggerFactory();     ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));     ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));     ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));     bootstrap = new ServerBootstrap(channelFactory);     final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);     channels = nettyHandler.getChannels();     bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
        public ChannelPipeline getPipeline() {
            NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec() ,getUrl(), NettyServer.this);             ChannelPipeline pipeline = Channels.pipeline();             pipeline.addLast("decoder", adapter.getDecoder());             pipeline.addLast("encoder", adapter.getEncoder());             pipeline.addLast("handler", nettyHandler);             return pipeline;         }     });     // bind     channel = bootstrap.bind(getBindAddress()); }

>>  queues

分别在 FixedThreadPool.java、LimitedThreadPool.java 和 CachedThreadPool.java 中使用,代码详情见 3.2章节。 由代码可见,默认值为 0,表示使用同步阻塞队列;如果 queues 设置为小于 0 的值,则使用容量为 Integer.MAX_VALUE 的阻塞链表队列;如果为其他值,则使用指定大小的阻塞链表队列。

 

>>  connections

DubboProtocol.java

 
private ExchangeClient[] getClients(URL url){    //是否共享连接    boolean service_share_connect = false;    int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);    //如果connections不配置,则共享连接,否则每服务每连接    if (connections == 0){        service_share_connect = true;        connections = 1;    }    ExchangeClient[] clients = new ExchangeClient[connections];    for (int i = 0; i < clients.length; i++) {        if (service_share_connect){            clients[i] = getSharedClient(url);        } else {            clients[i] = initClient(url);        }    }    return clients;}     //是否共享连接     boolean service_share_connect = false;     int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);     //如果connections不配置,则共享连接,否则每服务每连接     if (connections == 0){
        service_share_connect = true;         connections = 1;     }     ExchangeClient[] clients = new ExchangeClient[connections];     for (int i = 0; i < clients.length; i++) {
        if (service_share_connect){
            clients[i] = getSharedClient(url);         } else {
            clients[i] = initClient(url);         }     }     return clients; }

 

DubboInvoker.java

 
@Overrideprotected Result doInvoke(final Invocation invocation) throws Throwable {    RpcInvocation inv = (RpcInvocation) invocation;    final String methodName = RpcUtils.getMethodName(invocation);    inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());    inv.setAttachment(Constants.VERSION_KEY, version);    ExchangeClient currentClient;    if (clients.length == 1) {        currentClient = clients[0];    } else {        currentClient = clients[index.getAndIncrement() % clients.length];    }    try {        boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);        boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);        int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY,Constants.DEFAULT_TIMEOUT);        if (isOneway) {            boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);            currentClient.send(inv, isSent);            RpcContext.getContext().setFuture(null);            return new RpcResult();        } else if (isAsync) {            ResponseFuture future = currentClient.request(inv, timeout) ;            RpcContext.getContext().setFuture(new FutureAdapter(future));            return new RpcResult();        } else {            RpcContext.getContext().setFuture(null);            return (Result) currentClient.request(inv, timeout).get();        }    } catch (TimeoutException e) {        throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);    } catch (RemotingException e) {        throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);    }} protected Result doInvoke(final Invocation invocation) throws Throwable {
    RpcInvocation inv = (RpcInvocation) invocation;     final String methodName = RpcUtils.getMethodName(invocation);     inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());     inv.setAttachment(Constants.VERSION_KEY, version);     ExchangeClient currentClient;     if (clients.length == 1) {
        currentClient = clients[0];     } else {
        currentClient = clients[index.getAndIncrement() % clients.length];     }     try {
        boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);         boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);         int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY,Constants.DEFAULT_TIMEOUT);         if (isOneway) {
            boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);             currentClient.send(inv, isSent);             RpcContext.getContext().setFuture(null);             return new RpcResult();         } else if (isAsync) {
            ResponseFuture future = currentClient.request(inv, timeout) ;             RpcContext.getContext().setFuture(new FutureAdapter(future));             return new RpcResult();         } else {
            RpcContext.getContext().setFuture(null);             return (Result) currentClient.request(inv, timeout).get();         }     } catch (TimeoutException e) {
        throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);     } catch (RemotingException e) {
        throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);     } }

以上可见,默认值为0,表示针对每个 Provider,所有客户端共享一个长连接;否则,建立指定数量的长连接。在调用时,如果有多个长连接,则使用轮询方式获得一个长连接。

 

>>  actives

ActiveLimitFilter.java

 
public Result invoke(Invoker
 invoker, Invocation invocation) throws RpcException {    URL url = invoker.getUrl();    String methodName = invocation.getMethodName();    int max = invoker.getUrl().getMethodParameter(methodName, Constants.ACTIVES_KEY, 0);    RpcStatus count = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName());    if (max > 0) {        long timeout = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.TIMEOUT_KEY, 0);        long start = System.currentTimeMillis();        long remain = timeout;        int active = count.getActive();        if (active >= max) {            synchronized (count) {                while ((active = count.getActive()) >= max) {                    try {                        count.wait(remain);                    } catch (InterruptedException e) {                    }                    long elapsed = System.currentTimeMillis() - start;                    remain = timeout - elapsed;                    if (remain <= 0) {                        throw new RpcException("Waiting concurrent invoke timeout in client-side for service:  "                                               + invoker.getInterface().getName() + ", method: "                                               + invocation.getMethodName() + ", elapsed: " + elapsed                                               + ", timeout: " + timeout + ". concurrent invokes: " + active                                               + ". max concurrent invoke limit: " + max);                    }                }            }        }    }    try {        long begin = System.currentTimeMillis();        RpcStatus.beginCount(url, methodName);        try {            Result result = invoker.invoke(invocation);            RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, true);            return result;        } catch (RuntimeException t) {            RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, false);            throw t;        }    } finally {        if(max>0){            synchronized (count) {                count.notify();            }         }    }}     URL url = invoker.getUrl();     String methodName = invocation.getMethodName();     int max = invoker.getUrl().getMethodParameter(methodName, Constants.ACTIVES_KEY, 0);     RpcStatus count = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName());     if (max > 0) {
        long timeout = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.TIMEOUT_KEY, 0);         long start = System.currentTimeMillis();         long remain = timeout;         int active = count.getActive();         if (active >= max) {
            synchronized (count) {
                while ((active = count.getActive()) >= max) {
                    try {
                        count.wait(remain);                     } catch (InterruptedException e) {
                    }                     long elapsed = System.currentTimeMillis() - start;                     remain = timeout - elapsed;                     if (remain <= 0) {
                        throw new RpcException("Waiting concurrent invoke timeout in client-side for service:  "                                                + invoker.getInterface().getName() + ", method: "                                                + invocation.getMethodName() + ", elapsed: " + elapsed                                                + ", timeout: " + timeout + ". concurrent invokes: " + active                                                + ". max concurrent invoke limit: " + max);                     }                 }             }         }     }     try {
        long begin = System.currentTimeMillis();         RpcStatus.beginCount(url, methodName);         try {
            Result result = invoker.invoke(invocation);             RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, true);             return result;         } catch (RuntimeException t) {
            RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, false);             throw t;         }     } finally {
        if(max>0){
            synchronized (count) {
                count.notify();             }          }     } }

Consumer 调用时,统计服务和方法维度的调用情况,如果并发数超过设置的最大值,则阻塞当前线程,直到前面有请求处理完成。

 

>>  accepts

AbstractServer.java

 
@Overridepublic void connected(Channel ch) throws RemotingException {    Collection
 channels = getChannels();    if (accepts > 0 && channels.size() > accepts) {        logger.error("Close channel " + ch + ", cause: The server " + ch.getLocalAddress() + " connections greater than max config " + accepts);        ch.close();        return;    }    super.connected(ch);} public void connected(Channel ch) throws RemotingException {
    Collection
 channels = getChannels();     if (accepts > 0 && channels.size() > accepts) {
        logger.error("Close channel " + ch + ", cause: The server " + ch.getLocalAddress() + " connections greater than max config " + accepts);         ch.close();         return;     }     super.connected(ch); }

当连接数大于最大值时,关闭当前连接。

 

>>  executes

ExecuteLimitFilter.java

 
public Result invokeOrg(Invoker
 invoker, Invocation invocation) throws RpcException {    URL url = invoker.getUrl();    String methodName = invocation.getMethodName();    int max = url.getMethodParameter(methodName, Constants.EXECUTES_KEY, 0);    if (max > 0) {        RpcStatus count = RpcStatus.getStatus(url, invocation.getMethodName());        if (count.getActive() >= max) {            throw new RpcException("Failed to invoke method " + invocation.getMethodName() + " in provider " + url + ", cause: The service using threads greater than 
 limited.");        }    }    long begin = System.currentTimeMillis();    boolean isException = false;    RpcStatus.beginCount(url, methodName);    try {        Result result = invoker.invoke(invocation);        return result;    } catch (Throwable t) {        isException = true;        if(t instanceof RuntimeException) {            throw (RuntimeException) t;        }        else {            throw new RpcException("unexpected exception when ExecuteLimitFilter", t);        }    }    finally {        RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, isException);    }}     URL url = invoker.getUrl();     String methodName = invocation.getMethodName();     int max = url.getMethodParameter(methodName, Constants.EXECUTES_KEY, 0);     if (max > 0) {
        RpcStatus count = RpcStatus.getStatus(url, invocation.getMethodName());         if (count.getActive() >= max) {
            throw new RpcException("Failed to invoke method " + invocation.getMethodName() + " in provider " + url + ", cause: The service using threads greater than 
 limited.");         }     }     long begin = System.currentTimeMillis();     boolean isException = false;     RpcStatus.beginCount(url, methodName);     try {
        Result result = invoker.invoke(invocation);         return result;     } catch (Throwable t) {
        isException = true;         if(t instanceof RuntimeException) {
            throw (RuntimeException) t;         }         else {
            throw new RpcException("unexpected exception when ExecuteLimitFilter", t);         }     }     finally {
        RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, isException);     } }

Provider处理请求时,统计方法维度的调用情况,如果并发数超过设置的最大值,则阻直接抛出异常。

 

作者:cyfonly

出处:http://www.cnblogs.com/cyfonly/

640?wx_fmt=png

你可能感兴趣的文章
SpringMvc注解之@ControllerAdvice
查看>>
SQL--查询两个字段相同的记录
查看>>
多研究些架构,少谈些框架(1) -- 论微服务架构的核心概念
查看>>
多研究些架构,少谈些框架(2)-- 微服务和充血模型
查看>>
多研究些架构,少谈些框架(3)-- 微服务和事件驱动
查看>>
SQL性能优化梳理
查看>>
微服务架构技术栈
查看>>
想面试进BAT,不得不看的分布式锁,面试题都在这里了!!
查看>>
Redis最常被问到知识点总结
查看>>
这才是微服务拆分的正确姿势,值得学习!
查看>>
MySQL中一条SQL是如何执行的?
查看>>
MySQL的索引是什么?怎么优化?
查看>>
2万字长文包教包会 JVM 内存结构
查看>>
不懂 spring 就彻底放弃 Java 吧!
查看>>
从MySQL高可用架构看高可用架构设计
查看>>
可以秒杀全场的SpringCloud微服务电商实战项目,文档贼全!
查看>>
java架构之路(多线程)synchronized详解以及锁的膨胀升级过程
查看>>
java架构之路(多线程)AQS之ReetrantLock显示锁的使用和底层源码解读
查看>>
百度现场面试:JVM+算法+Redis+数据库!(三面)
查看>>
java架构之路(多线程)JMM和volatile关键字
查看>>