限流实现-Eureka-client 的RateLimiter

/*

  • Copyright 2014 Netflix, Inc.
    *
  • Licensed under the Apache License, Version 2.0 (the "License");
  • you may not use this file except in compliance with the License.
  • You may obtain a copy of the License at
    *
  • http://www.apache.org/license...
    *
  • Unless required by applicable law or agreed to in writing, software
  • distributed under the License is distributed on an "AS IS" BASIS,
  • WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  • See the License for the specific language governing permissions and
  • limitations under the License.
    */

package com.netflix.discovery.util;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

/**

  • Rate limiter implementation is based on token bucket algorithm. There are two parameters:
    • burst size - maximum number of requests allowed into the system as a burst
    • average rate - expected number of requests per second (RateLimiters using MINUTES is also supported)

    *
  • @author Tomasz Bak
    */

public class RateLimiter {

private final long rateToMsConversion;

private final AtomicInteger consumedTokens = new AtomicInteger();
private final AtomicLong lastRefillTime = new AtomicLong(0);

@Deprecated
public RateLimiter() {
    this(TimeUnit.SECONDS);
}

public RateLimiter(TimeUnit averageRateUnit) {
    switch (averageRateUnit) {
        case SECONDS:
            rateToMsConversion = 1000;
            break;
        case MINUTES:
            rateToMsConversion = 60 * 1000;
            break;
        default:
            throw new IllegalArgumentException("TimeUnit of " + averageRateUnit + " is not supported");
    }
}

public boolean acquire(int burstSize, long averageRate) {
    return acquire(burstSize, averageRate, System.currentTimeMillis());
}

public boolean acquire(int burstSize, long averageRate, long currentTimeMillis) {
    if (burstSize <= 0 || averageRate <= 0) { // Instead of throwing exception, we just let all the traffic go
        return true;
    }

    refillToken(burstSize, averageRate, currentTimeMillis);
    return consumeToken(burstSize);
}

private void refillToken(int burstSize, long averageRate, long currentTimeMillis) {
    long refillTime = lastRefillTime.get();
    long timeDelta = currentTimeMillis - refillTime;

    long newTokens = timeDelta * averageRate / rateToMsConversion;
    if (newTokens > 0) {
        long newRefillTime = refillTime == 0
                ? currentTimeMillis
                : refillTime + newTokens * rateToMsConversion / averageRate;
        if (lastRefillTime.compareAndSet(refillTime, newRefillTime)) {
            while (true) {
                int currentLevel = consumedTokens.get();
                int adjustedLevel = Math.min(currentLevel, burstSize); // In case burstSize decreased
                int newLevel = (int) Math.max(0, adjustedLevel - newTokens);
                if (consumedTokens.compareAndSet(currentLevel, newLevel)) {
                    return;
                }
            }
        }
    }
}

private boolean consumeToken(int burstSize) {
    while (true) {
        int currentLevel = consumedTokens.get();
        if (currentLevel >= burstSize) {
            return false;
        }
        if (consumedTokens.compareAndSet(currentLevel, currentLevel + 1)) {
            return true;
        }
    }
}

public void reset() {
    consumedTokens.set(0);
    lastRefillTime.set(0);
}

}

你可能感兴趣的