SpringCloud之ribbon的使用以及源码解析

ribbon源码解析

ribbon简介

ribbon是一个客户端的负载均衡工具,Ribbon客户端组件提
供一系列的完善的配置,如超时,重试等。通过Load Balancer获取到服务提供的所有机器实例,
Ribbon会自动基于某种规则(轮询,随机)去调用这些服务

ribbon的使用

new出来一个RestTemplate,然后上面加上@LoadBalanced

@Configuration
public class RestTemplateConfig {

    @Bean
    @LoadBalanced
    public RestTemplate getRestTemplate(){
       return new RestTemplate();
    }

}

然后使用restTemplate.getForObject就能实现客户端的负载均衡了。

    @GetMapping(value = "/nacos/comsumer/port/{id}")
    public String getPort(@PathVariable("id") Long id) {
        return restTemplate.getForObject(serverUrl+"/payment/nacos/"+ id,String.class);
    }

ribbon几大核心接口

IRule

服务器会根据配置的IRule,在serverList中选取一台
SpringCloud之ribbon的使用以及源码解析_第1张图片

  1. RandomRule: 随机选择一个状态为up的Server。
  2. RetryRule: 对选定的负载均衡策略机上重试机制,在一个配置时间段内当选择Server不成
    功,则一直尝试使用subRule的方式选择一个可用的server。
  3. RoundRobinRule: 轮询选择, 轮询index,选择index对应位置的Server。
  4. AvailabilityFilteringRule: 过滤掉一直连接失败的被标记为circuit tripped的后端Server,并
    过滤掉那些高并发的后端Server或者使用一个AvailabilityPredicate来包含过滤server的逻辑,其
    实就是检查status里记录的各个Server的运行状态。
  5. BestAvailableRule: 选择一个最小的并发请求的Server,逐个考察Server,如果Server被
    tripped了,则跳过。
  6. WeightedResponseTimeRule: 根据响应时间加权,响应时间越长,权重越小,被选中的可
    能性越低。
  7. ZoneAvoidanceRule: 默认的负载均衡策略,即复合判断Server所在区域的性能和
    Server的可用性选择Server,在没有区域的环境下,类似于轮询(RandomRule)
  8. NacosRule: 同集群优先调用
IPing

SpringCloud之ribbon的使用以及源码解析_第2张图片

  1. DummyPing 默认的策略,isAlive默认返回true
  2. NIWSDiscoveryPing ,isAlive根据注册中心的状态,up的就返回true
  3. NoOpPing isAlive默认返回true
  4. PingUrl 拼装http请求去访问服务提供方,返回200就返回true
  5. PingConstant 根据配置的constantStr是true返回true,否则返回false
ILoadBalancer

SpringCloud之ribbon的使用以及源码解析_第3张图片

  1. ZoneAwareLoadBalancer 默认的负载均衡器
  2. NoOpLoadBalancer 什么都不做的。。。。

ribbon源码解析

源码流程图

SpringCloud之ribbon的使用以及源码解析_第4张图片

初始化工作

核心注解就是@LoadBalanced,现在分析一下LoadBalanced注解。
代码是springboot的starter,所以找META-INF\spring.factories
首先看LoadBalancerAutoConfiguration

@ConditionalOnClass(RestTemplate.class)
@ConditionalOnBean(LoadBalancerClient.class)
@EnableConfigurationProperties(LoadBalancerRetryProperties.class)
public class LoadBalancerAutoConfiguration {

由于引入了ribbon
@ConditionalOnClass(RestTemplate.class)是肯定满足的,
@ConditionalOnBean(LoadBalancerClient.class),要有LoadBalancerClient这个bean,所以又要去找LoadBalancerClient是在哪里放进去bean工厂的。
LoadBalancerClient是一个接口,去找他的实现类RibbonLoadBalancerClient
SpringCloud之ribbon的使用以及源码解析_第5张图片
然后就看RibbonLoadBalancerClient什么时候放到bean,
找到同包下面的META-INF\spring.factories,只有一个RibbonAutoConfiguration。


@AutoConfigureBefore({ LoadBalancerAutoConfiguration.class,
		AsyncLoadBalancerAutoConfiguration.class })
@EnableConfigurationProperties({ RibbonEagerLoadProperties.class,
		ServerIntrospectorProperties.class })
public class RibbonAutoConfiguration 
	@Bean
	@ConditionalOnMissingBean(LoadBalancerClient.class)
	public LoadBalancerClient loadBalancerClient() {
		return new RibbonLoadBalancerClient(springClientFactory());
	}

从注解中可以看到AutoConfigureBefore,说明这个类在LoadBalancerAutoConfiguration前执行,当容器中少了LoadBalancerClient,就会往容器里面加入RibbonLoadBalancerClient。

RibbonLoadBalancerClient有了,然后就回到LoadBalancerAutoConfiguration ,看看都往容器里面加入了什么东西

可以拿到所有被@LoadBalanced注解的RestTemplate,原因是LoadBalanced 注解上面有@Qualifier,作用相当于@Qualifier

	@LoadBalanced
	@Autowired(required = false)
	private List<RestTemplate> restTemplates = Collections.emptyList();

@Target({ ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD })
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Qualifier
public @interface LoadBalanced {
@Bean
	public SmartInitializingSingleton loadBalancedRestTemplateInitializerDeprecated(
			final ObjectProvider<List<RestTemplateCustomizer>> restTemplateCustomizers) {
		return () -> restTemplateCustomizers.ifAvailable(customizers -> {
			for (RestTemplate restTemplate : LoadBalancerAutoConfiguration.this.restTemplates) {
				for (RestTemplateCustomizer customizer : customizers) {
					customizer.customize(restTemplate);
				}
			}
		});
	}

	@Bean
	@ConditionalOnMissingBean
	public LoadBalancerRequestFactory loadBalancerRequestFactory(
			LoadBalancerClient loadBalancerClient) {
		return new LoadBalancerRequestFactory(loadBalancerClient, this.transformers);
	}


		@Bean
		public LoadBalancerInterceptor ribbonInterceptor(
				LoadBalancerClient loadBalancerClient,
				LoadBalancerRequestFactory requestFactory) {
			return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
		}

		@Bean
		@ConditionalOnMissingBean
		public RestTemplateCustomizer restTemplateCustomizer(
				final LoadBalancerInterceptor loadBalancerInterceptor) {
			return restTemplate -> {
				List<ClientHttpRequestInterceptor> list = new ArrayList<>(
						restTemplate.getInterceptors());
				list.add(loadBalancerInterceptor);
				restTemplate.setInterceptors(list);
			};
		}

就往容器中添加了LoadBalancerInterceptor ,然后LoadBalancerInterceptor 有作为参数,传进去构造RestTemplateCustomizer,然后RestTemplateCustomizer 又作为参数传进去构造SmartInitializingSingleton。

其实就是定制化RestTemplate,往RestTemplate加入LoadBalancerInterceptor。

restTemplate的调用

因为上面定制了一个拦截器LoadBalancerInterceptor到restTemplate中,所以直接打断点到LoadBalancerInterceptor的intercept方法。
调用链如下
SpringCloud之ribbon的使用以及源码解析_第6张图片

@Override
	public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
			final ClientHttpRequestExecution execution) throws IOException {
			//拿到请求的uri
		final URI originalUri = request.getURI();
		//拿到serviceName 
		String serviceName = originalUri.getHost();
		Assert.state(serviceName != null,
				"Request URI does not contain a valid hostname: " + originalUri);
				//真正干过的代码
		return this.loadBalancer.execute(serviceName,
				this.requestFactory.createRequest(request, body, execution));
	}

loadBalancer就是上面传进来的RibbonLoadBalancerClient

然后就看RibbonLoadBalancerClient的execute方法

	public <T> T execute(String serviceId, LoadBalancerRequest<T> request, Object hint)
			throws IOException {
			//拿到ILoadBalancer 
		ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
		// 拿到server
		Server server = getServer(loadBalancer, hint);
		if (server == null) {
			throw new IllegalStateException("No instances available for " + serviceId);
		}
		RibbonServer ribbonServer = new RibbonServer(serviceId, server,
				isSecure(server, serviceId),
				serverIntrospector(serviceId).getMetadata(server));

		return execute(serviceId, ribbonServer, request);
	}

先看第一行代码ILoadBalancer loadBalancer = getLoadBalancer(serviceId);这是从bean中拿到ILoadBalancer ,然后又要找ILoadBalancer 什么时候放进去spring容器中的,在RibbonClientConfiguration找到了注入的代码

	@Bean
	@ConditionalOnMissingBean
	public ILoadBalancer ribbonLoadBalancer(IClientConfig config,
			ServerList<Server> serverList, ServerListFilter<Server> serverListFilter,
			IRule rule, IPing ping, ServerListUpdater serverListUpdater) {
		if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) {
			return this.propertiesFactory.get(ILoadBalancer.class, config, name);
		}
		return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList,
				serverListFilter, serverListUpdater);
	}

在这个方法里面又传入了一堆的参数
IClientConfig config

我的项目是基于nacos的,所以ServerList是NacosServerList
ServerList serverList,

	@Bean
	@ConditionalOnMissingBean
	public ServerList<?> ribbonServerList(IClientConfig config,
			NacosDiscoveryProperties nacosDiscoveryProperties) {
		NacosServerList serverList = new NacosServerList(nacosDiscoveryProperties);
		serverList.initWithNiwsConfig(config);
		return serverList;
	}

ServerListFilter serverListFilter,
IRule rule

	@Bean
	@ConditionalOnMissingBean
	public IRule ribbonRule(IClientConfig config) {
		if (this.propertiesFactory.isSet(IRule.class, name)) {
			return this.propertiesFactory.get(IRule.class, config, name);
		}
		ZoneAvoidanceRule rule = new ZoneAvoidanceRule();
		rule.initWithNiwsConfig(config);
		return rule;
	}

IPing ping

	@Bean
	@ConditionalOnMissingBean
	public IPing ribbonPing(IClientConfig config) {
		if (this.propertiesFactory.isSet(IPing.class, name)) {
			return this.propertiesFactory.get(IPing.class, config, name);
		}
		return new DummyPing();
	}

ServerListUpdater serverListUpdater

	@Bean
	@ConditionalOnMissingBean
	public ServerListUpdater ribbonServerListUpdater(IClientConfig config) {
		return new PollingServerListUpdater(config);
	}

这些属性都齐了之后,我们再回去看new ZoneAwareLoadBalancer<>(config, rule, ping, serverList,
serverListFilter, serverListUpdater);
SpringCloud之ribbon的使用以及源码解析_第7张图片
构造方法里面又调用了他父类DynamicServerListLoadBalancer的构造方法,DynamicServerListLoadBalancer构造方法里面又调用了父类BaseLoadBalancer的构造方法。

BaseLoadBalancer构造方法

    public BaseLoadBalancer(IClientConfig config, IRule rule, IPing ping) {
        initWithConfig(config, rule, ping, createLoadBalancerStatsFromConfig(config));
    }

initWithConfig方法,里面主要有1个重要的方法
setPingInterval(pingIntervalTime);开启一个定时地线程去定时ping一下和服务提供方

    void setupPingTask() {
        if (canSkipPing()) {
            return;
        }
        if (lbTimer != null) {
            lbTimer.cancel();
        }
        lbTimer = new ShutdownEnabledTimer("NFLoadBalancer-PingTimer-" + name,
                true);
        lbTimer.schedule(new PingTask(), 0, pingIntervalSeconds * 1000);
        forceQuickPing();
    }

看到PingTask的run方法,再进入BaseLoadBalancer的runPinger,根据配置的Iping策略去定时ping服务提供方,判断server是否还在线

 public void runPinger() throws Exception {
            if (!pingInProgress.compareAndSet(false, true)) { 
                return; // Ping in progress - nothing to do
            }
            
            // we are "in" - we get to Ping

            Server[] allServers = null;
            boolean[] results = null;

            Lock allLock = null;
            Lock upLock = null;

            try {
                /*
                 * The readLock should be free unless an addServer operation is
                 * going on...
                 */
                allLock = allServerLock.readLock();
                allLock.lock();
                allServers = allServerList.toArray(new Server[allServerList.size()]);
                allLock.unlock();

                int numCandidates = allServers.length;
                results = pingerStrategy.pingServers(ping, allServers);

                final List<Server> newUpList = new ArrayList<Server>();
                final List<Server> changedServers = new ArrayList<Server>();

                for (int i = 0; i < numCandidates; i++) {
                    boolean isAlive = results[i];
                    Server svr = allServers[i];
                    boolean oldIsAlive = svr.isAlive();

                    svr.setAlive(isAlive);

                    if (oldIsAlive != isAlive) {
                        changedServers.add(svr);
                        logger.debug("LoadBalancer [{}]:  Server [{}] status changed to {}", 
                    		name, svr.getId(), (isAlive ? "ALIVE" : "DEAD"));
                    }

                    if (isAlive) {
                        newUpList.add(svr);
                    }
                }
                upLock = upServerLock.writeLock();
                upLock.lock();
                upServerList = newUpList;
                upLock.unlock();

                notifyServerStatusChangeListener(changedServers);
            } finally {
                pingInProgress.set(false);
            }
        }
    }

然后回到DynamicServerListLoadBalancer的构造方法,
又有一行关键的代码

restOfInit(clientConfig);

进入方法restOfInit

void restOfInit(IClientConfig clientConfig) {
        boolean primeConnection = this.isEnablePrimingConnections();
        // turn this off to avoid duplicated asynchronous priming done in BaseLoadBalancer.setServerList()
        this.setEnablePrimingConnections(false);
        enableAndInitLearnNewServersFeature();

        updateListOfServers();
        if (primeConnection && this.getPrimeConnections() != null) {
            this.getPrimeConnections()
                    .primeConnections(getReachableServers());
        }
        this.setEnablePrimingConnections(primeConnection);
        LOGGER.info("DynamicServerListLoadBalancer for client {} initialized: {}", clientConfig.getClientName(), this.toString());
    }

先看enableAndInitLearnNewServersFeature(),serverListUpdater就是刚刚初始化的PollingServerListUpdater

    public void enableAndInitLearnNewServersFeature() {
        LOGGER.info("Using serverListUpdater {}", serverListUpdater.getClass().getSimpleName());
        serverListUpdater.start(updateAction);
    }

其实就是执行PollingServerListUpdater的start方法,这里又开启了一个线程,去执行DynamicServerListLoadBalancer的doUpdate方法

 public void doUpdate() {
            updateListOfServers();
        }

进入updateListOfServers()

 servers = serverListImpl.getUpdatedListOfServers();
 updateAllServerList(servers);
 

去nacos的服务表拿到服务列表,更新到自己的本地缓存。

然后就ZoneAwareLoadBalancer构造完成了,所以ILoadBalancer loadBalancer = getLoadBalancer(serviceId)拿到的就是ZoneAwareLoadBalancer。

进入第二步,拿到server

Server server = getServer(loadBalancer, hint);

再进入getServer

loadBalancer.chooseServer(hint != null ? hint : "default");

进入BaseLoadBalancer的chooseServer

 public Server chooseServer(Object key) {
        if (counter == null) {
            counter = createCounter();
        }
        counter.increment();
        if (rule == null) {
            return null;
        } else {
            try {
                return rule.choose(key);
            } catch (Exception e) {
                logger.warn("LoadBalancer [{}]:  Error choosing server for key {}", name, key, e);
                return null;
            }
        }
    }

rule默认是刚刚注入的ZoneAvoidanceRule,里面没有choose方法,找他的父类PredicateBasedRule

    public Server choose(Object key) {
        ILoadBalancer lb = getLoadBalancer();
        Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key);
        if (server.isPresent()) {
            return server.get();
        } else {
            return null;
        }       
    }

在ILoadBalancer 的server列表中选出server

 public Optional<Server> chooseRoundRobinAfterFiltering(List<Server> servers, Object loadBalancerKey) {
        List<Server> eligible = getEligibleServers(servers, loadBalancerKey);
        if (eligible.size() == 0) {
            return Optional.absent();
        }
        return Optional.of(eligible.get(incrementAndGetModulo(eligible.size())));
    }

算法在AbstractServerPredicate#getEligibleServers。
就这样,完成了server地址的替换,再可以去请求服务端了。

你可能感兴趣的