网络销售怎么做网站,相关文章 wordpress插件,淘宝关键词排名优化技巧,怎样在设计网站做图赚钱吗背景
处理函数中处理输出主输出的数据流数据外,也可以输出多个其他的副输出的数据流数据#xff0c;当我们的处理函数有副输出时#xff0c;我们需要测试他们功能的正确性#xff0c;本文就提供一个测试flink副输出单元测试的例子
测试flink副输出单元测试
首先看一下处理…背景
处理函数中处理输出主输出的数据流数据外,也可以输出多个其他的副输出的数据流数据当我们的处理函数有副输出时我们需要测试他们功能的正确性本文就提供一个测试flink副输出单元测试的例子
测试flink副输出单元测试
首先看一下处理函数其中包含副输出逻辑
public class MySideOutputProcessFunction extends ProcessFunctionString, String {public static final OutputTagString OUTPUT_TAG new OutputTagString(sideoutput) {};Overridepublic void processElement(String value, Context ctx, CollectorString out) throws Exception {out.collect(normal: value);ctx.output(OUTPUT_TAG, side: value);}
}其次看下对应的单元测试
/*** 测试sideOutput的输出功能*/
Test
public void testSideOutput() throws Exception {MySideOutputProcessFunction mySideOutputProcessFunction new MySideOutputProcessFunction();OneInputStreamOperatorTestHarnessString, String testHarness ProcessFunctionTestHarnesses.forProcessFunction(mySideOutputProcessFunction);testHarness.open();testHarness.processElement(hello, 10);// 测试主输出Assert.assertEquals(Lists.newArrayList(normal:hello), testHarness.extractOutputValues());ConcurrentLinkedQueueStreamRecordString sideOutPutQueue testHarness.getSideOutput(MySideOutputProcessFunction.OUTPUT_TAG);// 测试副输出Assert.assertEquals(Lists.newArrayList(side:hello),sideOutPutQueue.stream().map(StreamRecord::getValue).collect(Collectors.toList()));testHarness.close();
}