如何发送时间窗KTable的最终kafkastream聚合结果?

我想要做的是这样的:

  1. 从数字主题(Long的)消费logging
  2. 聚合(计数)每个5秒窗口的值
  3. 将最终的聚合结果发送到另一个主题

我的代码如下所示:

KStream<String, Long> longs = builder.stream( Serdes.String(), Serdes.Long(), "longs"); // In one ktable, count by key, on a five second tumbling window. KTable<Windowed<String>, Long> longCounts = longs.countByKey(TimeWindows.of("longCounts", 5000L)); // Finally, sink to the long-avgs topic. longCounts.toStream((wk, v) -> wk.key()) .to("long-counts"); 

它看起来像一切正常,但聚合发送到每个传入logging的目标主题。 我的问题是我怎么才能发送每个窗口的最终聚合结果?

在Kafka Streams中,没有像“最终聚合”这样的东西。 Windows始终保持开放,以处理迟到的logging(当然,窗口不会永久保存,直到保留时间到期,它们才会被丢弃 – 但是,当窗口被丢弃时,没有特别的操作)。

有关更多详细信息,请参阅Confluent文档: http : //docs.confluent.io/current/streams/

因此,对于聚合的每个更新,都会生成结果logging(因为Kafkastream也会在迟到的logging上更新聚合结果)。 您的“最终结果”将是最新的结果logging(在窗口被丢弃之前)。 根据您的使用情况,手动重复数据删除是解决问题的一种方法(使用较低的杠杆API, transform()process()

这个博客文章可能也有帮助: https : //timothyrenner.github.io/engineering/2016/08/11/kafka-streams-not-looking-at-facebook.html