多组消费
dubbo多组消费即消费者订阅的group为*或者,分隔比如188,1这种形式!服务治理时有些情况会用到这种形式,比如跨地区部署等。
问题现象
Consumer订阅配置如下
<dubbo:consumer check="false" timeout="60000" loadbalance="xxfirst"/>
<dubbo:reference id="provider1"
interface="com.tc.dubbo.api.Provider1"
registry="dubbo-registry"
check="false"
group="188,1"/>
实践发现自定义的router路由策略是有效的! 但是原有的xxfirst负载均衡策略失效了!
源码分析
RegistryProtocol的doRefer()方法内部cluster.join()负责创建ClusterInvoker对象,所有的cluster的invoker的选择逻辑都在这个函数实现。
Dubbo默认的cluster是FailoverCluster,对应FailoverClusterInvoker,其选择策略是会从负载均衡策略选择!
FailoverClusterInvoker.java
@SuppressWarnings({"unchecked", "rawtypes"})
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
List<Invoker<T>> copyinvokers = invokers;
checkInvokers(copyinvokers, invocation);
int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;
if (len <= 0) {
len = 1;
}
// retry loop.
RpcException le = null; // last exception.
List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); // invoked invokers.
Set<String> providers = new HashSet<String>(len);
for (int i = 0; i < len; i++) {
//Reselect before retry to avoid a change of candidate `invokers`.
//NOTE: if `invokers` changed, then `invoked` also lose accuracy.
if (i > 0) {
checkWhetherDestroyed();
copyinvokers = list(invocation);
// check again
checkInvokers(copyinvokers, invocation);
}
// 这里会走负载均衡策略
Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);
invoked.add(invoker);
RpcContext.getContext().setInvokers((List) invoked);
...
}
...
}
}
而当group是*或者188,1这种形式时。RegistryProtocol的doRefer()时使用的是MergeableCluster!
RegistryProtocol.java
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);
Registry registry = registryFactory.getRegistry(url);
if (RegistryService.class.equals(type)) {
return proxyFactory.getInvoker((T) registry, type, url);
}
// group="a,b" or group="*"
Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));
String group = qs.get(Constants.GROUP_KEY);
//这里doRefer是使用的MergeableCluster
if (group != null && group.length() > 0) {
if ((Constants.COMMA_SPLIT_PATTERN.split(group)).length > 1
|| "*".equals(group)) {
return doRefer(getMergeableCluster(), registry, type, url);
}
}
return doRefer(cluster, registry, type, url);
}
private Cluster getMergeableCluster() {
return ExtensionLoader.getExtensionLoader(Cluster.class).getExtension("mergeable");
}
MergeableCluster对应MergeableClusterInvoker,看下其实现并没有走负载均衡策略!
MergeableClusterInvoker.java
@SuppressWarnings("rawtypes")
public Result invoke(final Invocation invocation) throws RpcException {
// 这里直接从Directory里取invokers,并没有走负载均衡!
List<Invoker<T>> invokers = directory.list(invocation);
......
return new RpcResult(result);
}
RegistryDirectory继承至AbstractDirectory,其list方法会走router逻辑,所以路由器还是有效的!
AbstractDirectory.java
public List<Invoker<T>> list(Invocation invocation) throws RpcException {
if (destroyed) {
throw new RpcException("Directory already destroyed .url: " + getUrl());
}
List<Invoker<T>> invokers = doList(invocation);
List<Router> localRouters = this.routers; // local reference
if (localRouters != null && !localRouters.isEmpty()) {
for (Router router : localRouters) {
try {
if (router.getUrl() == null || router.getUrl().getParameter(Constants.RUNTIME_KEY, false)) {//router过滤逻辑
invokers = router.route(invokers, getConsumerUrl(), invocation);
}
} catch (Throwable t) {
logger.error("Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t);
}
}
}
return invokers;
}