Tag: apache kafka streams

如何发送时间窗KTable的最终kafkastream聚合结果?

我想要做的是这样的: 从数字主题(Long的)消费logging 聚合(计数)每个5秒窗口的值 将最终的聚合结果发送到另一个主题 我的代码如下所示: KStream<String, Long> longs = builder.stream( Serdes.String(), Serdes.Long(), "longs"); // In one ktable, count by key, on a five second tumbling window. KTable<Windowed<String>, Long> longCounts = longs.countByKey(TimeWindows.of("longCounts", 5000L)); // Finally, sink to the long-avgs topic. longCounts.toStream((wk, v) -> wk.key()) .to("long-counts"); 它看起来像一切正常,但聚合发送到每个传入logging的目标主题。 我的问题是我怎么才能发送每个窗口的最终聚合结果?