节stream方法在N秒钟内调用M个请求
我需要一个组件/类,在N秒(或毫秒或毫微秒,无所谓)中将某些方法的执行限制为最大M个调用。
换句话说,我需要确保我的方法在N秒的滑动窗口中执行不超过M次。
如果你不知道现有的类可以自由发布你的解决scheme/想法你将如何实现这一点。
我会用一个固定大小为M的时间戳环形缓冲区 。每次调用方法时,都会检查最早的入口,如果过去less于N秒,则执行并添加另一个入口,否则,您将入睡为时差。
Google Guava RateLimiter是我开箱即用的。
// Allow one request per second private RateLimiter throttle = RateLimiter.create(1.0); private void someMethod() { throttle.acquire(); // Do something }
具体而言,你应该可以用DelayQueue
来实现这个。 用M
Delayed
实例初始化队列,延迟最初设置为零。 当对方法的请求进来时, take
一个令牌,导致方法阻塞,直到达到节stream要求。 当一个令牌已经被采取, add
一个新的令牌到延迟N
的队列。
阅读令牌桶algorithm。 基本上,你有一个桶里有令牌。 每次执行该方法时,都需要一个令牌。 如果没有更多的令牌,则阻塞,直到获得一个令牌。 同时,有一些外部的演员以固定的时间间隔补充代币。
我不知道有一个图书馆做这个(或类似的)。 你可以把这个逻辑写入代码中,或者使用AspectJ来添加行为。
这取决于应用程序。
想象一下, multithreading想让一个令牌做全局速率限制的动作 ,而不允许爆发 (即你想每10秒限制10个动作,但是你不想在第一个动作发生10个动作,然后保持9秒停止)。
DelayedQueue有一个缺点:线程请求令牌的顺序可能不是它们完成请求的顺序。 如果多个线程被阻塞等待一个令牌,那么不清楚哪一个线程将获取下一个可用的令牌。 在我看来,你甚至可以有线程永久等待。
一种解决方法是在两次连续的动作之间有一个最小的时间间隔 ,并按照请求的相同顺序采取行动。
这是一个实现:
public class LeakyBucket { protected float maxRate; protected long minTime; //holds time of last action (past or future!) protected long lastSchedAction = System.currentTimeMillis(); public LeakyBucket(float maxRate) throws Exception { if(maxRate <= 0.0f) { throw new Exception("Invalid rate"); } this.maxRate = maxRate; this.minTime = (long)(1000.0f / maxRate); } public void consume() throws InterruptedException { long curTime = System.currentTimeMillis(); long timeLeft; //calculate when can we do the action synchronized(this) { timeLeft = lastSchedAction + minTime - curTime; if(timeLeft > 0) { lastSchedAction += minTime; } else { lastSchedAction = curTime; } } //If needed, wait for our time if(timeLeft <= 0) { return; } else { Thread.sleep(timeLeft); } } }
虽然这不是你所要求的,但是ThreadPoolExecutor
也是有用的,它被devise用来在M秒内同时请求而不是M个请求。
我需要确保我的方法在N秒的滑动窗口中执行不超过M次。
我最近写了一篇博客文章,介绍如何在.NET中做到这一点。 您可能可以在Java中创build类似的东西。
.NET中更好的速率限制[Penned Objects]
如果您需要基于Java的滑动窗口速率限制器,可以在分布式系统上运行,那么您可能需要查看https://github.com/mokies/ratelimitj项目。;
Redis支持的configuration,将每分钟IP请求数限制为50,如下所示:
import com.lambdaworks.redis.RedisClient; import es.moki.ratelimitj.core.LimitRule; RedisClient client = RedisClient.create("redis://localhost"); Set<LimitRule> rules = Collections.singleton(LimitRule.of(1, TimeUnit.MINUTES, 50)); // 50 request per minute, per key RedisRateLimit requestRateLimiter = new RedisRateLimit(client, rules); boolean overLimit = requestRateLimiter.overLimit("ip:127.0.0.2");
有关Redisconfiguration的详细信息,请参阅https://github.com/mokies/ratelimitj/tree/master/ratelimitj-redis 。
原来的问题听起来很像这个博客文章解决的问题: Java多通道asynchronous调节器 。
对于N秒钟内的M个呼叫的速率,本博客中讨论的调速器保证时间线上任何长度为N的时间间隔不会包含多于M个呼叫。
我已经实现了一个简单的限制algorithm。 试试这个链接, http://krishnaprasadas.blogspot.in/2012/05/throttling-algorithm.html
有关algorithm的简要介绍,
该algorithm利用了Java 延迟队列的能力。 用期望的延迟创build一个延迟对象(这里是毫秒TimeUnit的 1000 / M)。 将同一个对象放入延迟的队列中,实习人员将为我们提供移动窗口。 然后,在每个方法调用之前,将对象从队列中取出,take是一个阻塞调用,只有在指定的延迟之后才会返回,并且在方法调用之后不要忘记将对象放入更新时间(此处为毫秒) 。
在这里,我们也可以有不同延迟的多个延迟对象。 这种方法也将提供高吞吐量。
尝试使用这个简单的方法:
public class SimpleThrottler { private static final int T = 1; // min private static final int N = 345; private Lock lock = new ReentrantLock(); private Condition newFrame = lock.newCondition(); private volatile boolean currentFrame = true; public SimpleThrottler() { handleForGate(); } /** * Payload */ private void job() { try { Thread.sleep(Math.abs(ThreadLocalRandom.current().nextLong(12, 98))); } catch (InterruptedException e) { e.printStackTrace(); } System.err.print(" J. "); } public void doJob() throws InterruptedException { lock.lock(); try { while (true) { int count = 0; while (count < N && currentFrame) { job(); count++; } newFrame.await(); currentFrame = true; } } finally { lock.unlock(); } } public void handleForGate() { Thread handler = new Thread(() -> { while (true) { try { Thread.sleep(1 * 900); } catch (InterruptedException e) { e.printStackTrace(); } finally { currentFrame = false; lock.lock(); try { newFrame.signal(); } finally { lock.unlock(); } } } }); handler.start(); }
}
Apache Camel也支持Throttler机制如下:
from("seda:a").throttle(100).asyncDelayed().to("seda:b");
在分布式系统中需要locking时,可以使用redis。 第二个algorithm在https://redis.io/commands/incr
这是上面的LeakyBucket代码的更新。 这适用于每秒1000个请求。
import lombok.SneakyThrows; import java.util.concurrent.TimeUnit; class LeakyBucket { private long minTimeNano; // sec / billion private long sched = System.nanoTime(); /** * Create a rate limiter using the leakybucket alg. * @param perSec the number of requests per second */ public LeakyBucket(double perSec) { if (perSec <= 0.0) { throw new RuntimeException("Invalid rate " + perSec); } this.minTimeNano = (long) (1_000_000_000.0 / perSec); } @SneakyThrows public void consume() { long curr = System.nanoTime(); long timeLeft; synchronized (this) { timeLeft = sched - curr + minTimeNano; sched += minTimeNano; } if (timeLeft <= minTimeNano) { return; } TimeUnit.NANOSECONDS.sleep(timeLeft); } }
和上面的unit testing:
import com.google.common.base.Stopwatch; import org.junit.Ignore; import org.junit.Test; import java.util.concurrent.TimeUnit; import java.util.stream.IntStream; public class LeakyBucketTest { @Test @Ignore public void t() { double numberPerSec = 10000; LeakyBucket b = new LeakyBucket(numberPerSec); Stopwatch w = Stopwatch.createStarted(); IntStream.range(0, (int) (numberPerSec * 5)).parallel().forEach( x -> b.consume()); System.out.printf("%,d ms%n", w.elapsed(TimeUnit.MILLISECONDS)); } }
看看[TimerTask 1类。 或ScheduledExecutor 。