testing风暴螺栓和喷口

这是关于用Java编写的Storm Storm拓扑中的unit testing螺栓和喷口的一般性问题。

什么是推荐的unit testing(JUnit?) 螺栓喷口的做法和指导方针?

例如,我可以为Bolt写一个JUnittesting,但是如果没有完全理解框架(比如Bolt的生命周期)和序列化的含义,很容易造成基​​于构造器的非序列化成员variables的创build错误。 在JUnit中,这个testing会通过,但是在拓扑结构中,这是行不通的。 我完全想象有很多testing点需要考虑(比如序列化和生命周期的例子)。

因此,如果您使用基于JUnit的unit testing,build议您运行一个小的模拟拓扑( LocalMode ?)并testing该拓扑下Bolt (或Spout )的隐含合约? 或者,可以使用JUnit,但是这意味着我们必须仔细模拟Bolt的生命周期(创build它,调用prepare() ,嘲弄一个Config等)。 在这种情况下,被testing的类(Bolt / Spout)要考虑什么样的一般testing点?

其他开发人员在创build适当的unit testing方面做了什么?

我注意到有一个拓扑testingAPI(请参阅: https : //github.com/xumingming/storm-lib/blob/master/src/jvm/storm/TestingApiDemo.java )。 使用一些API是否更好,并为每个BoltSpout站起来“testing拓扑”(并validationBolt必须提供的隐式合约,例如 – 它是宣布的输出)?

谢谢

我们的方法是使用可串行化工厂的构造函数注入到喷口/螺栓中。 喷嘴/螺栓然后以其开放/准备方法咨询工厂。 工厂的单一职责是封装以序列化的方式获得喷口/螺栓的依赖关系。 这种devise允许我们的unit testing注入假冒/testing/模拟工厂,当咨询时返回模拟服务。 通过这种方式,我们可以使用嘲笑(例如Mockito)对狭口单元进行unit testing。

下面是一个螺栓和一个testing的通用例子。 我省略了工厂UserNotificationFactory的实现,因为它取决于您的应用程序。 您可以使用服务定位器来获取服务,序列化configuration,HDFS可访问configuration,或者真正以任何方式获得正确的服务,只要工厂可以在一个serde周期后进行即可。 你应该涵盖该类的序列化。

螺栓

 public class NotifyUserBolt extends BaseBasicBolt { public static final String NAME = "NotifyUser"; private static final String USER_ID_FIELD_NAME = "userId"; private final UserNotifierFactory factory; transient private UserNotifier notifier; public NotifyUserBolt(UserNotifierFactory factory) { checkNotNull(factory); this.factory = factory; } @Override public void prepare(Map stormConf, TopologyContext context) { notifier = factory.createUserNotifier(); } @Override public void execute(Tuple input, BasicOutputCollector collector) { // This check ensures that the time-dependency imposed by Storm has been observed checkState(notifier != null, "Unable to execute because user notifier is unavailable. Was this bolt successfully prepared?"); long userId = input.getLongByField(PreviousBolt.USER_ID_FIELD_NAME); notifier.notifyUser(userId); collector.emit(new Values(userId)); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields(USER_ID_FIELD_NAME)); } } 

testing

 public class NotifyUserBoltTest { private NotifyUserBolt bolt; @Mock private TopologyContext topologyContext; @Mock private UserNotifier notifier; // This test implementation allows us to get the mock to the unit-under-test. private class TestFactory implements UserNotifierFactory { private final UserNotifier notifier; private TestFactory(UserNotifier notifier) { this.notifier = notifier; } @Override public UserNotifier createUserNotifier() { return notifier; } } @Before public void before() { MockitoAnnotations.initMocks(this); // The factory will return our mock `notifier` bolt = new NotifyUserBolt(new TestFactory(notifier)); // Now the bolt is holding on to our mock and is under our control! bolt.prepare(new Config(), topologyContext); } @Test public void testExecute() { long userId = 24; Tuple tuple = mock(Tuple.class); when(tuple.getLongByField(PreviousBolt.USER_ID_FIELD_NAME)).thenReturn(userId); BasicOutputCollector collector = mock(BasicOutputCollector.class); bolt.execute(tuple, collector); // Here we just verify a call on `notifier`, but we could have stubbed out behavior befor // the call to execute, too. verify(notifier).notifyUser(userId); verify(collector).emit(new Values(userId)); } } 

从0.8.1版本开始,Storm的unit testing工具已经通过Java公开:

有关如何使用此API的示例,请看这里:

我们采取的一种方法是将大多数应用程序逻辑从螺栓和喷口中移出,然后转换成我们用来通过实例化并通过最小接口使用它们的繁重工作的对象。 然后我们对这些对象和集成testing进行unit testing,虽然这确实留下了一个空白。

事实certificate,相当容易模拟像OutputDeclarer,Tuple和OutputFieldsDeclarer这样的风暴对象。 其中,只有OutputDeclarer会看到任何副作用,所以编码OutputDeclarer模拟类可以回应任何元组和锚。 然后,您的testing类可以使用这些模拟类的实例来轻松configuration螺栓/喷口实例,调用它并validation预期的副作用。