SpringCloud源码解析 -- Eureka原理探究

本文通过阅读Eureka源码,分享Eureka的实现原理。
本文主要梳理Eureka整体设计及实现,并不一一列举Eureka源码细节。

源码分析基于Spring Cloud Hoxton,Eureka版本为1.9

Eureka分为Eureka Client,Eureka Server,多个Eureka Server节点组成一个Eureka集群,服务通过Eureka Client注册到Eureka Server。
SpringCloud源码解析 -- Eureka原理探究_第1张图片

CAP理论指出,一个分布式系统不可能同时满足C(一致性)、A(可用性)和P(分区容错性)。
由于分布式系统中必须保证分区容错性,因此我们只能在A和C之间进行权衡。
Zookeeper保证的是CP, 而Eureka则是保证AP。
为什么呢?
在注册中心这种场景中,可用性比一致性更重要。
作为注册中心,其实数据是不经常变更的,只有服务发布,机器上下线,服务扩缩容时才变更。
因此Eureka选择AP,即使出问题了,也返回旧数据,保证服务能(最大程度)正常调用, 避免出现因为注册中心的问题导致服务不可用这种得不偿失的情况。
所以,Eureka各个节点都是平等的(去中心化的架构,无master/slave区分),挂掉的节点不会影响正常节点的工作,剩余的节点依然可以提供注册和查询服务。

Eureka Client

Eureka 1.9只要引入spring-cloud-starter-netflix-eureka-client依赖,即使不使用@EnableDiscoveryClient或@EnableEurekaClient注解,服务也会注册到Eureka集群。

client主要逻辑在com.netflix.discovery.DiscoveryClient实现,EurekaClientAutoConfiguration中构建了其子类CloudEurekaClient。

定时任务

DiscoveryClient#initScheduledTasks方法设置定时任务,主要有CacheRefreshThread,HeartbeatThread,以及InstanceInfoReplicator。

同步

服务注册信息缓存在DiscoveryClient#localRegionApps变量中,CacheRefreshThread负责定时从Eureka Server读取最新的服务注册信息,更新到本地缓存。
CacheRefreshThread -> DiscoveryClient#refreshRegistry -> DiscoveryClient#fetchRegistry
当存在多个Eureka Server节点时,Client会与eureka.client.serviceUrl.defaultZone配置的第一个Server节点同步数据,当第一个Server节点同步失败,才会同步第二个节点,以此类推。

从DiscoveryClient#fetchRegistry可以看到,同步数据有两个方法
(1)全量同步
由DiscoveryClient#getAndStoreFullRegistry方法实现,通过Http Get调用Server接口apps/
获取Server节点中所有服务注册信息替换DiscoveryClient#localRegionApps

注意:Client请求Server端的服务,都是通过EurekaHttpClient接口发起,该接口实现类EurekaHttpClientDecorator通过RequestExecutor接口将请求委托给其他EurekaHttpClient实现类,并提供execute方法给子类实现扩展处理(该扩展处理可以针对每一个EurekaHttpClient方法,类似AOP)。子类RetryableEurekaHttpClient#execute中,会获取eureka.client.service-url.defaultZone中配置的地址,通过TransportClientFactory#newClient,构造一个RestTemplateTransportClientFactory,再真正发起请求。

(2)增量同步
由DiscoveryClient#getAndUpdateDelta方法实现,通过Http Get调用Server接口apps/delta,获取最新ADDED、MODIFIED,DELETED操作,更新本地缓存。
如果获取最新操作失败,则会发起全量同步。

配置:
eureka.client.fetch-registry,是否定时同步信息,默认true
eureka.client.registry-fetch-interval-seconds,间隔多少秒同步一次服务注册信息,默认30

心跳

HeartbeatThread -> DiscoveryClient#renew -> EurekaHttpClient#sendHeartBeat
通过Http Put调用Server接口apps/{appName}/{instanceId}
appName是服务的spring.application.name,instanceId是服务IP加服务端口。

注意:如果Server返回NOT_FOUND状态,则重新注册。

配置:
eureka.client.register-with-eureka,当前应用是否注册到Eureka集群,默认true
eureka.instance.lease-renewal-interval-in-seconds,间隔多少秒发送一次心跳,默认30

注册

DiscoveryClient#构造函数 -> DiscoveryClient#register
通过Http Post调用Server接口apps/{appName},发送当前应用的注册信息到Server。
配置:
eureka.client.register-with-eureka,当前应用是否注册到Eureka集群,默认true
eureka.client.should-enforce-registration-at-init,是否在初始化时注册,默认false

InstanceInfoReplicator

InstanceInfoReplicator任务会去监测应用自身的IP信息以及配置信息是否发生改变,如果发生改变,则会重新发起注册。
配置:
eureka.client.initial-instance-info-replication-interval-seconds,间隔多少秒检查一次自身信息,默认40

下线

EurekaClientAutoConfiguration配置了CloudEurekaClient的销毁方法

@Bean(destroyMethod = "shutdown")

DiscoveryClient#shutdown方法完成下线的处理工作,包括取消定时任务,调用unregister方法(通过Http Delete调用Server接口apps/{appName}/{id}),取消监控任务等

Eureka Server

@EnableEurekaServer引入EurekaServerMarkerConfiguration,EurekaServerMarkerConfiguration构建EurekaServerMarkerConfiguration.Marker。
EurekaServerAutoConfiguration会在Spring上下文中存在EurekaServerMarkerConfiguration.Marker时生效,构造Server端组件类。

Eureka Server也要使用DiscoveryClient,拉取其他Server节点的服务注册信息或者将自身注册到Eureka集群中。

启动同步

Server启动时,需要从相邻Server节点获取服务注册信息,同步到自身内存。

Server的服务注册信息存放在AbstractInstanceRegistry#registry变量中,类型为ConcurrentHashMap>>。
外层Map Key为appName,外层Map Key为instanceId,Lease代表Client与Server之间维持的一个契约。InstanceInfo保存具体的服务注册信息,如instanceId,appName,ipAddr,port等。

EurekaServerBootstrap是Server端的启动引导类,EurekaServerInitializerConfiguration实现了Lifecycle接口,start方法调用eurekaServerBootstrap.contextInitialized完成Server端初始化。
eurekaServerBootstrap.contextInitialized -> EurekaServerBootstrap#initEurekaServerContext -> PeerAwareInstanceRegistryImpl#syncUp -> AbstractInstanceRegistry#register
PeerAwareInstanceRegistryImpl#syncUp调用DiscoveryClient#getApplications方法,获取相邻server节点的所有服务注册信息,再调用AbstractInstanceRegistry#register方法,注册到AbstractInstanceRegistry#registry变量中。

AbstractInstanceRegistry#register

public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
    try {
        read.lock();
        Map> gMap = registry.get(registrant.getAppName());
        REGISTER.increment(isReplication);
        ...
        // #1
        Lease existingLease = gMap.get(registrant.getId());    
        if (existingLease != null && (existingLease.getHolder() != null)) {
            Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
            Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
            ...
            // #2
            if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {    
                registrant = existingLease.getHolder();
            }
        } else {
            synchronized (lock) {
                if (this.expectedNumberOfClientsSendingRenews > 0) {
                    this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1;
                    // #3
                    updateRenewsPerMinThreshold();    
                }
            }
            logger.debug("No previous lease information found; it is new registration");
        }
        Lease lease = new Lease(registrant, leaseDuration);
        if (existingLease != null) {
            lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
        }
        // #4
        gMap.put(registrant.getId(), lease);    
        ...
        registrant.setActionType(ActionType.ADDED);
        // #5
        recentlyChangedQueue.add(new RecentlyChangedItem(lease));    
        registrant.setLastUpdatedTimestamp();
        invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());    
        logger.info("Registered instance {}/{} with status {} (replication={})",
                registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
    } finally {
        read.unlock();
    }
}

#1 通过appName,instanceId查询已有的Lease
#2 如果该服务已存在Lease,并且LastDirtyTimestamp的值更大,使用已存在的Lease。
#3 更新numberOfRenewsPerMinThreshold,该值用于自我保护模式。
#4 构建一个新的Lease,添加到AbstractInstanceRegistry#registry缓存中。
#5 添加recentlyChangedQueue,apps/delta接口从中获取最新变更操作。

提供服务

Server通过ApplicationsResource/ApplicationResource/InstanceResource对外提供Http服务。

AbstractInstanceRegistry负责实现cancle,register,renew,statusUpdate,deleteStatusOverride等操作的业务逻辑。
PeerAwareInstanceRegistryImpl通过replicateToPeers方法将操作同步到其他节点,以保证集群节点数据同步。
PeerAwareInstanceRegistryImpl#replicateToPeers方法最后一个参数isReplication,决定是否需要进行同步。
如果Server节点接收到其他Server节点发送的同步操作,是不需要再继续向其他Server同步的,否则会引起循环更新。
该参数通过Http Requst的Header参数x-netflix-discovery-replication决定(只有Client发送的请求该参数才为true)。

数据一致

PeerAwareInstanceRegistryImpl#replicateToPeers方法通过PeerEurekaNodes#getPeerEurekaNodes获取其他server节点地址,
PeerEurekaNodes#peerEurekaNodes变量维护了所有的Server节点信息。

PeerEurekaNodes通过peersUpdateTask任务定时从DNS或配置文件获取最新的Server节点地址列表,并更新PeerEurekaNodes#peerEurekaNodes。
配置:
eureka.server.peer-eureka-nodes-update-interval-ms,间隔多少分钟拉取一次Server节点地址列表,默认10

PeerEurekaNode管理具体一个Server节点,并负责向该Server节点同步register,cancel,heartbeat等操作。
PeerEurekaNode通过定时任务的方式同步这些操作。它维护了两个TaskDispatcher,批处理调度器batchingDispatcher和非批处理调度器nonBatchingDispatcher。
PeerEurekaNode#构造方法调用TaskDispatchers#createBatchingTaskDispatcher构造TaskDispatcher

public static  TaskDispatcher createBatchingTaskDispatcher(String id,
                                                                         int maxBufferSize,
                                                                         int workloadSize,
                                                                         int workerCount,
                                                                         long maxBatchingDelay,
                                                                         long congestionRetryDelayMs,
                                                                         long networkFailureRetryMs,
                                                                         TaskProcessor taskProcessor) {
    final AcceptorExecutor acceptorExecutor = new AcceptorExecutor<>(
            id, maxBufferSize, workloadSize, maxBatchingDelay, congestionRetryDelayMs, networkFailureRetryMs
    );
    final TaskExecutors taskExecutor = TaskExecutors.batchExecutors(id, workerCount, taskProcessor, acceptorExecutor);
    return new TaskDispatcher() {
        public void process(ID id, T task, long expiryTime) {
            acceptorExecutor.process(id, task, expiryTime);
        }

        public void shutdown() {
            acceptorExecutor.shutdown();
            taskExecutor.shutdown();
        }
    };
}

TaskDispatcher负责任务分发,过期任务会被抛弃,如果两个任务有相同id,则前一个任务则会被删除。
AcceptorExecutor负责整合任务,将任务放入批次中。
TaskExecutors将整合好的任务(批次)分给TaskProcessor处理,实际处理任务的是ReplicationTaskProcessor。
ReplicationTaskProcessor可以重复执行失败的任务,ReplicationTaskProcessor#process(List tasks)处理批次任务,将tasks合并到一个请求,发送到下游Server接口peerreplication/batch/
任务类为ReplicationTask,它提供了handleFailure方法,当下游Server接口返回statusCode不在[200,300)区间,则调用该方法。

从TaskExecutors#BatchWorkerRunnable的run方法可以看到,
调用下游Server接口时,如果下游返回503状态或发生IO异常,会通过taskDispatcher.reprocess重新执行任务,以保证最终一致性。
如果发生其他异常,只打印日志,不重复执行任务。

配置:
eureka.server.max-elements-in-peer-replication-pool,等待执行任务最大数量,默认为10000

需要注意一下PeerEurekaNode#heartbeat方法,心跳任务实现了handleFailure方法

public void handleFailure(int statusCode, Object responseEntity) throws Throwable {
    super.handleFailure(statusCode, responseEntity);
    if (statusCode == 404) {
        logger.warn("{}: missing entry.", getTaskName());
        if (info != null) {
            logger.warn("{}: cannot find instance id {} and hence replicating the instance with status {}",
                    getTaskName(), info.getId(), info.getStatus());
            register(info);
        }
    } 
    ...
}

如果下游server节点没有找到服务注册信息,就返回404状态,这时需要重新注册该服务。这点很重要,它可以保证不同Server节点保持数据一致。

假设有一个client,注册到Eureka集群server1,server2,server3。下面来分析两个场景
场景1. client启动时,server1接收带client的注册信息,但同步给server2前宕机了,怎么办?
这时,client定时发起心跳,但它与server1心跳操作失败,只能向server2发起心跳,server2返回404(NOT_FOUND状态),client重新注册。

场景2. server3与其他机器server1,server2之间出现了网络分区,这时client注册到eureka集群。然后网络恢复了,server3怎么同步数据呢?
当server1向server3同步心跳时,server3返回404,于是server1重新向server3注册client信息,数据最终保持一致。

主动失效

AbstractInstanceRegistry#deltaRetentionTimer任务会定时移除recentlyChangedQueue中过期的增量操作信息
配置:
eureka.server.delta-retention-timer-interval-in-ms,间隔多少秒清理一次过期的增量操作信息,默认30
eureka.server.retention-time-in-m-s-in-delta-queue,增量操作保留多少分钟,默认3

AbstractInstanceRegistry#evictionTimer任务会定时剔除AbstractInstanceRegistry#registry中已经过期的(太久没收到心跳)服务注册信息。
计算服务失效时间时还要加上补偿时间,即计算本次任务执行的时间和上次任务执行的时间差,若超过eviction-interval-timer-in-ms配置值则加上超出时间差作为补偿时间。
每次剔除服务的数量都有一个上限,为注册服务数量*renewal-percent-threshold,Eureka会随机剔除过期的服务。
配置:
eureka.server.eviction-interval-timer-in-ms,间隔多少秒清理一次过期的服务,默认60
eureka.instance.lease-expiration-duration-in-seconds,间隔多少秒没收到心跳则判定服务过期,默认90
eureka.server.renewal-percent-threshold,自我保护阀值因子,默认0.85

自我保护机制

PeerAwareInstanceRegistryImpl#scheduleRenewalThresholdUpdateTask,定时更新numberOfRenewsPerMinThreshold,该值用于判定是否进入自我保护模式,在自我保护模式下,AbstractInstanceRegistry#evictionTimer任务直接返回,不剔除过期服务。

numberOfRenewsPerMinThreshold计算在PeerAwareInstanceRegistryImpl#updateRenewsPerMinThreshold

protected void updateRenewsPerMinThreshold() {
    this.numberOfRenewsPerMinThreshold = (int) (this.expectedNumberOfClientsSendingRenews
            * (60.0 / serverConfig.getExpectedClientRenewalIntervalSeconds())
            * serverConfig.getRenewalPercentThreshold());
}

expectedNumberOfClientsSendingRenews -> 已注册服务总数
60.0 / serverConfig.getExpectedClientRenewalIntervalSeconds() -> expected-client-renewal-interval-seconds配置了Client间隔多少秒发一次心跳,这里计算一个Client每分钟发送心跳数量。
RenewalPercentThreshold 自我保护阀值因子。
可以看到,numberOfRenewsPerMinThreshold表示一分钟内Server接收心跳最低次数,实际数量少于该值则进入自我保护模式。
此时Eureka认为客户端与注册中心出现了网络故障(比如网络故障或频繁的启动关闭客户端),不再剔除任何服务,它要等待网络故障恢复后,再退出自我保护模式。这样可以最大程度保证服务间正常调用。

PeerAwareInstanceRegistryImpl#isLeaseExpirationEnabled方法判定当前是否处于自我保护模式。该方法比较renewsLastMin中的值是否大于numberOfRenewsPerMinThreshold,AbstractInstanceRegistry#renewsLastMin统计一分钟内心跳次数。
配置:
eureka.server.enable-self-preservation,是否启用自我保护机制,默认为true
eureka.server.expected-client-renewal-interval-seconds,Client间隔多少秒发送一次心跳
eureka.server.renewal-percent-threshold,自我保护阀值因子,默认0.85

状态更新

InstanceInfo维护了状态变量status和覆盖状态变量overriddenStatus。
status是Eureka Client本身发布的状态。
overriddenstatus是手动或通过工具强制执行的状态。
Server端提供服务apps/{appName}/{instanceId}/status,可以变更服务实例status以及overriddenStatus,从而主动变更服务状态。
注意,并不会修改Client端的服务状态,而是修改Server段服务注册信息中保存的服务状态。
而Server处理Client注册或心跳时,会使用overriddenstatus覆盖status。
Eureka Client在获取到注册信息时,会调用DiscoveryClient#shuffleInstances方法,过滤掉非InstanceStatus.UP状态的服务实例,从而避免调动该实例,以达到服务实例的暂停服务,而无需关闭服务实例。

InstanceInfo还维护了lastDirtyTimestamp变量,代表服务注册信息最后更新时间。
从InstanceResource可以看到,更新状态statusUpdate或者删除状态deleteStatusUpdate时都可以提供lastDirtyTimestamp,
而处理心跳的renewLease方法,必须有lastDirtyTimestamp参数,validateDirtyTimestamp方法负责检验lastDirtyTimestamp参数

  1. 当lastDirtyTimestamp参数等于当前注册信息中的lastDirtyTimestamp,返回处理成功。
  2. 当lastDirtyTimestamp参数大于当前注册信息中的lastDirtyTimestamp,返回NOT_FOUND状态,表示Client的信息已经过期,需要重新注册。
  3. 当lastDirtyTimestamp参数小于当前注册信息中的lastDirtyTimestamp,返回CONFLICT(409)状态,表示数据冲突,并返回当前节点中该服务的注册信息。

这时如果心跳是Client发起的,Client会忽略409的返回状态(DiscoveryClient#renew),但如果是其他Server节点同步过来的,发送心跳的Server节点会使用返回的服务注册信息更新本节点的注册信息(PeerEurekaNode#heartbeat)。

配置:
eureka.client.filter-only-up-instances,获取实例时是否只保留UP状态的实例,默认为true
eureka.server.sync-when-timestamp-differs,当时间戳不一致时,是否进行同步数据,默认为true

文本关于Eureka的分享就到这里,我们可以Eureka设计和实现都比较简单,但是非常实用。
我在深入阅读Eureka源码前犹豫了一段时间(毕竟Eureka 2.0 开源流产),不过经过一段时间深入学习,收获不少,希望这篇文章也可以给对Eureka感兴趣的同学提供一个深入学习思路。

如果您觉得本文不错,欢迎关注我的微信公众号,您的关注是我坚持的动力!

你可能感兴趣的