什么是好的速率限制algorithm?

我可以使用一些伪代码,或者更好的Python。 我试图为Python IRC bot实现一个限速队列,它部分工作,但是如果有人触发less于限制的消息(例如,速率限制是每8秒5条消息,并且该人只触发4条消息),并且下一个触发超过8秒(例如,16秒后),机器人发送消息,但是队列变满并且僵尸等待8秒,即使8秒时间过去后不需要。

这里是最简单的algorithm ,如果你想在消息到达太快时(而不是排队它们,因为队列可能会变得很大)而丢弃消息:

rate = 5.0; // unit: messages per = 8.0; // unit: seconds allowance = rate; // unit: messages last_check = now(); // floating-point, eg usec accuracy. Unit: seconds when (message_received): current = now(); time_passed = current - last_check; last_check = current; allowance += time_passed * (rate / per); if (allowance > rate): allowance = rate; // throttle if (allowance < 1.0): discard_message(); else: forward_message(); allowance -= 1.0; 

在这个解决scheme中没有数据结构,定时器等,它的工作原理干净:)为了看到这一点,“津贴”最多以每秒5/8单位的速度增长,即每八秒最多五个单位。 每个转发的消息都会扣除一个单位,因此每八秒钟不能发送超过五条消息。

请注意, rate应该是一个整数,即没有非零小数部分,否则algorithm将无法正常工作(实际费率不会是rate/per )。 例如rate=0.5; per=1.0; rate=0.5; per=1.0; 不起作用,因为allowance永远不会增长到1.0。 但是rate=1.0; per=2.0; rate=1.0; per=2.0; 工作正常。

在排队的函数之前使用这个装饰器@RateLimited(ratepersec)。

基本上,这将检查自上次以来是否已经过1秒,如果不是,则等待剩余的时间,否则不等待。 这有效地限制你的速度/秒。 装饰器可以应用于任何你想要速率限制的function。

在你的情况下,如果你想每8秒钟最多5条消息,在你的sendToQueue函数之前使用@RateLimited(0.625)。

 import time def RateLimited(maxPerSecond): minInterval = 1.0 / float(maxPerSecond) def decorate(func): lastTimeCalled = [0.0] def rateLimitedFunction(*args,**kargs): elapsed = time.clock() - lastTimeCalled[0] leftToWait = minInterval - elapsed if leftToWait>0: time.sleep(leftToWait) ret = func(*args,**kargs) lastTimeCalled[0] = time.clock() return ret return rateLimitedFunction return decorate @RateLimited(2) # 2 per second at most def PrintNumber(num): print num if __name__ == "__main__": print "This should print 1,2,3... at about 2 per second." for i in range(1,100): PrintNumber(i) 

令牌桶实现相当简单。

从具有5个令牌的桶开始。

每5/8秒:如果存储桶less于5个令牌,则添加一个。

每次发送消息:如果存储桶有≥1个标记,则取出一个标记并发送消息。 否则,等待/丢弃消息/不pipe。

(显然,在实际的代码中,你会使用一个整数计数器,而不是真正的令牌,你可以通过存储时间戳来优化每5/8步)


再次读取问题,如果速率限制每8秒完全复位一次,那么这是一个修改:

在很久以前的时间(例如,在时期),以时间戳last_send开始。 另外,从相同的5令牌桶开始。

打击每5/8秒的规则。

每次发送消息:首先,检查last_send是否last_send秒前。 如果是这样,请填充桶(将其设置为5个令牌)。 其次,如果桶中有令牌,则发送消息(否则,drop / wait /等)。 第三,现在设置last_send

这应该适用于这种情况。


我已经写了一个IRC机器人使用这样的策略(第一种方法)。 它的Perl,而不是Python,但这里有一些代码来说明:

这里的第一部分处理向桶中添加令牌。 您可以看到基于时间(第2行到最后一行)添加令牌的优化,然后最后一行将桶内容限制到最大值(MESSAGE_BURST)

  my $start_time = time; ... # Bucket handling my $bucket = $conn->{fujiko_limit_bucket}; my $lasttx = $conn->{fujiko_limit_lasttx}; $bucket += ($start_time-$lasttx)/MESSAGE_INTERVAL; ($bucket <= MESSAGE_BURST) or $bucket = MESSAGE_BURST; 

$ conn是一个传递的数据结构。 这是一个常规运行的方法(它会计算下一次什么时候需要做什么,以及睡眠时间或直到获得networkingstream量)。 该方法的下一部分处理发送。 这是相当复杂的,因为信息与他们相关的优先事项。

  # Queue handling. Start with the ultimate queue. my $queues = $conn->{fujiko_queues}; foreach my $entry (@{$queues->[PRIORITY_ULTIMATE]}) { # Ultimate is special. We run ultimate no matter what. Even if # it sends the bucket negative. --$bucket; $entry->{code}(@{$entry->{args}}); } $queues->[PRIORITY_ULTIMATE] = []; 

这是第一个队列,不pipe运行什么。 即使这个连接因洪水而死亡。 用于非常重要的事情,比如响应服务器的PING。 接下来,其余的队列:

  # Continue to the other queues, in order of priority. QRUN: for (my $pri = PRIORITY_HIGH; $pri >= PRIORITY_JUNK; --$pri) { my $queue = $queues->[$pri]; while (scalar(@$queue)) { if ($bucket < 1) { # continue later. $need_more_time = 1; last QRUN; } else { --$bucket; my $entry = shift @$queue; $entry->{code}(@{$entry->{args}}); } } } 

最后,存储桶的状态被保存回$ conn数据结构中(实际上稍后在方法中;它首先计算它将在多长时间内完成更多的工作)

  # Save status. $conn->{fujiko_limit_bucket} = $bucket; $conn->{fujiko_limit_lasttx} = $start_time; 

正如你所看到的,实际的桶处理代码是非常小的 – 大约四行。 其余的代码是优先级队列处理。 机器人具有优先级队列,以便例如与之聊天的人不能阻止其执行重要的踢球/禁止任务。

阻塞处理,直到消息可以发送,从而排队进一步的消息,antti的美丽的解决scheme也可以这样修改:

 rate = 5.0; // unit: messages per = 8.0; // unit: seconds allowance = rate; // unit: messages last_check = now(); // floating-point, eg usec accuracy. Unit: seconds when (message_received): current = now(); time_passed = current - last_check; last_check = current; allowance += time_passed * (rate / per); if (allowance > rate): allowance = rate; // throttle if (allowance < 1.0): time.sleep( (1-allowance) * (per/rate)) forward_message(); allowance = 0.0; else: forward_message(); allowance -= 1.0; 

它只是等待,直到有足够的津贴发送消息。 以两倍的速度开始,津贴也可以初始化为0。

保持最后五行发送的时间。 保持排队的消息,直到第五个最近的消息(如果存在的话)过去至less8秒(以last_five作为一个时间数组):

 now = time.time() if len(last_five) == 0 or (now - last_five[-1]) >= 8.0: last_five.insert(0, now) send_message(msg) if len(last_five) > 5: last_five.pop() 

一种解决方法是给每个队列项目添加一个时间戳,并在8秒后丢弃该项目。 您可以在每次添加队列时执行此检查。

这只适用于将队列大小限制为5的情况,并在队列满时放弃所有添加。

如果有人仍然感兴趣,我使用这个简单的可调用类和定时LRU密钥值存储来限制每个IP的请求率。 使用一个deque,但是可以改写用于一个列表。

 from collections import deque import time class RateLimiter: def __init__(self, maxRate=5, timeUnit=1): self.timeUnit = timeUnit self.deque = deque(maxlen=maxRate) def __call__(self): if self.deque.maxlen == len(self.deque): cTime = time.time() if cTime - self.deque[0] > self.timeUnit: self.deque.append(cTime) return False else: return True self.deque.append(time.time()) return False r = RateLimiter() for i in range(0,100): time.sleep(0.1) print(i, "block" if r() else "pass") 

这个怎么样:

 long check_time = System.currentTimeMillis(); int msgs_sent_count = 0; private boolean isRateLimited(int msgs_per_sec) { if (System.currentTimeMillis() - check_time > 1000) { check_time = System.currentTimeMillis(); msgs_sent_count = 0; } if (msgs_sent_count > (msgs_per_sec - 1)) { return true; } else { msgs_sent_count++; } return false; } 

我需要Scala中的变体。 这里是:

 case class Limiter[-A, +B](callsPerSecond: (Double, Double), f: A ⇒ B) extends (A ⇒ B) { import Thread.sleep private def now = System.currentTimeMillis / 1000.0 private val (calls, sec) = callsPerSecond private var allowance = 1.0 private var last = now def apply(a: A): B = { synchronized { val t = now val delta_t = t - last last = t allowance += delta_t * (calls / sec) if (allowance > calls) allowance = calls if (allowance < 1d) { sleep(((1 - allowance) * (sec / calls) * 1000d).toLong) } allowance -= 1 } f(a) } } 

以下是如何使用它:

 val f = Limiter((5d, 8d), { _: Unit ⇒ println(System.currentTimeMillis) }) while(true){f(())} 

只是从接受的答案的代码的Python实现。

 import time class Object(object): pass def get_throttler(rate, per): scope = Object() scope.allowance = rate scope.last_check = time.time() def throttler(fn): current = time.time() time_passed = current - scope.last_check; scope.last_check = current; scope.allowance = scope.allowance + time_passed * (rate / per) if (scope.allowance > rate): scope.allowance = rate if (scope.allowance < 1): pass else: fn() scope.allowance = scope.allowance - 1 return throttler