节stream方法在N秒钟内调用M个请求

我需要一个组件/类,在N秒(或毫秒或毫微秒,无所谓)中将某些方法的执行限制为最大M个调用。

换句话说,我需要确保我的方法在N秒的滑动窗口中执行不超过M次。

如果你不知道现有的类可以自由发布你的解决scheme/想法你将如何实现这一点。

我会用一个固定大小为M的时间戳环形缓冲区 。每次调用方法时,都会检查最早的入口,如果过去less于N秒,则执行并添加另一个入口,否则,您将入睡为时差。

Google Guava RateLimiter是我开箱即用的。

// Allow one request per second private RateLimiter throttle = RateLimiter.create(1.0); private void someMethod() { throttle.acquire(); // Do something } 

具体而言,你应该可以用DelayQueue来实现这个。 用M Delayed实例初始化队列,延迟最初设置为零。 当对方法的请求进来时, take一个令牌,导致方法阻塞,直到达到节stream要求。 当一个令牌已经被采取, add一个新的令牌到延迟N的队列。

阅读令牌桶algorithm。 基本上,你有一个桶里有令牌。 每次执行该方法时,都需要一个令牌。 如果没有更多的令牌,则阻塞,直到获得一个令牌。 同时,有一些外部的演员以固定的时间间隔补充代币。

我不知道有一个图书馆做这个(或类似的)。 你可以把这个逻辑写入代码中,或者使用AspectJ来添加行为。

这取决于应用程序。

想象一下, multithreading想让一个令牌做全局速率限制的动作 ,而不允许爆发 (即你想每10秒限制10个动作,但是你不想在第一个动作发生10个动作,然后保持9秒停止)。

DelayedQueue有一个缺点:线程请求令牌的顺序可能不是它们完成请求的顺序。 如果多个线程被阻塞等待一个令牌,那么不清楚哪一个线程将获取下一个可用的令牌。 在我看来,你甚至可以有线程永久等待。

一种解决方法是在两次连续的动作之间一个最小的时间间隔 ,并按照请求的相同顺序采取行动。

这是一个实现:

 public class LeakyBucket { protected float maxRate; protected long minTime; //holds time of last action (past or future!) protected long lastSchedAction = System.currentTimeMillis(); public LeakyBucket(float maxRate) throws Exception { if(maxRate <= 0.0f) { throw new Exception("Invalid rate"); } this.maxRate = maxRate; this.minTime = (long)(1000.0f / maxRate); } public void consume() throws InterruptedException { long curTime = System.currentTimeMillis(); long timeLeft; //calculate when can we do the action synchronized(this) { timeLeft = lastSchedAction + minTime - curTime; if(timeLeft > 0) { lastSchedAction += minTime; } else { lastSchedAction = curTime; } } //If needed, wait for our time if(timeLeft <= 0) { return; } else { Thread.sleep(timeLeft); } } } 

虽然这不是你所要求的,但是ThreadPoolExecutor也是有用的,它被devise用来在M秒内同时请求而不是M个请求。

我需要确保我的方法在N秒的滑动窗口中执行不超过M次。

我最近写了一篇博客文章,介绍如何在.NET中做到这一点。 您可能可以在Java中创build类似的东西。

.NET中更好的速率限制[Penned Objects]

如果您需要基于Java的滑动窗口速率限制器,可以在分布式系统上运行,那么您可能需要查看https://github.com/mokies/ratelimitj项目。;

Redis支持的configuration,将每分钟IP请求数限制为50,如下所示:

 import com.lambdaworks.redis.RedisClient; import es.moki.ratelimitj.core.LimitRule; RedisClient client = RedisClient.create("redis://localhost"); Set<LimitRule> rules = Collections.singleton(LimitRule.of(1, TimeUnit.MINUTES, 50)); // 50 request per minute, per key RedisRateLimit requestRateLimiter = new RedisRateLimit(client, rules); boolean overLimit = requestRateLimiter.overLimit("ip:127.0.0.2"); 

有关Redisconfiguration的详细信息,请参阅https://github.com/mokies/ratelimitj/tree/master/ratelimitj-redis

原来的问题听起来很像这个博客文章解决的问题: Java多通道asynchronous调节器 。

对于N秒钟内的M个呼叫的速率,本博客中讨论的调速器保证时间线上任何长度为N的时间间隔不会包含多于M个呼叫。

我已经实现了一个简单的限制algorithm。 试试这个链接, http://krishnaprasadas.blogspot.in/2012/05/throttling-algorithm.html

有关algorithm的简要介绍,

该algorithm利用了Java 延迟队列的能力。 用期望的延迟创build一个延迟对象(这里是毫秒TimeUnit的 1000 / M)。 将同一个对象放入延迟的队列中,实习人员将为我们提供移动窗口。 然后,在每个方法调用之前,将对象从队列中取出,take是一个阻塞调用,只有在指定的延迟之后才会返回,并且在方法调用之后不要忘记将对象放入更新时间(此处为毫秒) 。

在这里,我们也可以有不同延迟的多个延迟对象。 这种方法也将提供高吞吐量。

尝试使用这个简单的方法:

 public class SimpleThrottler { private static final int T = 1; // min private static final int N = 345; private Lock lock = new ReentrantLock(); private Condition newFrame = lock.newCondition(); private volatile boolean currentFrame = true; public SimpleThrottler() { handleForGate(); } /** * Payload */ private void job() { try { Thread.sleep(Math.abs(ThreadLocalRandom.current().nextLong(12, 98))); } catch (InterruptedException e) { e.printStackTrace(); } System.err.print(" J. "); } public void doJob() throws InterruptedException { lock.lock(); try { while (true) { int count = 0; while (count < N && currentFrame) { job(); count++; } newFrame.await(); currentFrame = true; } } finally { lock.unlock(); } } public void handleForGate() { Thread handler = new Thread(() -> { while (true) { try { Thread.sleep(1 * 900); } catch (InterruptedException e) { e.printStackTrace(); } finally { currentFrame = false; lock.lock(); try { newFrame.signal(); } finally { lock.unlock(); } } } }); handler.start(); } 

}

Apache Camel也支持Throttler机制如下:

 from("seda:a").throttle(100).asyncDelayed().to("seda:b"); 

在分布式系统中需要locking时,可以使用redis。 第二个algorithm在https://redis.io/commands/incr

这是上面的LeakyBucket代码的更新。 这适用于每秒1000个请求。

 import lombok.SneakyThrows; import java.util.concurrent.TimeUnit; class LeakyBucket { private long minTimeNano; // sec / billion private long sched = System.nanoTime(); /** * Create a rate limiter using the leakybucket alg. * @param perSec the number of requests per second */ public LeakyBucket(double perSec) { if (perSec <= 0.0) { throw new RuntimeException("Invalid rate " + perSec); } this.minTimeNano = (long) (1_000_000_000.0 / perSec); } @SneakyThrows public void consume() { long curr = System.nanoTime(); long timeLeft; synchronized (this) { timeLeft = sched - curr + minTimeNano; sched += minTimeNano; } if (timeLeft <= minTimeNano) { return; } TimeUnit.NANOSECONDS.sleep(timeLeft); } } 

和上面的unit testing:

 import com.google.common.base.Stopwatch; import org.junit.Ignore; import org.junit.Test; import java.util.concurrent.TimeUnit; import java.util.stream.IntStream; public class LeakyBucketTest { @Test @Ignore public void t() { double numberPerSec = 10000; LeakyBucket b = new LeakyBucket(numberPerSec); Stopwatch w = Stopwatch.createStarted(); IntStream.range(0, (int) (numberPerSec * 5)).parallel().forEach( x -> b.consume()); System.out.printf("%,d ms%n", w.elapsed(TimeUnit.MILLISECONDS)); } } 

看看[TimerTask 1类。 或ScheduledExecutor 。

Interesting Posts