供热设施网站搭建教程,网站建设的关键事项,05网学霸答案,吴江住房和城乡建设局官方网站背景
我们经常需要对KeyedBroadcastProcessFunction函数进行单元测试#xff0c;以确保上线之前这个函数的功能是正常的#xff0c;包括里面的广播状态和键值分区状态
测试KeyedBroadcastProcessFunction类 Testpublic void testHarnessForKeyedBroadcastProcessFunction()…背景
我们经常需要对KeyedBroadcastProcessFunction函数进行单元测试以确保上线之前这个函数的功能是正常的包括里面的广播状态和键值分区状态
测试KeyedBroadcastProcessFunction类 Testpublic void testHarnessForKeyedBroadcastProcessFunction() throws Exception {KeyedBroadcastProcessFunctionString, String, String, String function new MyKeyedBroadcastProcessFunction();// 键值分区状态final ValueStateDescriptorString valueStateDescriptor new ValueStateDescriptor(item, BasicTypeInfo.STRING_TYPE_INFO);// 广播状态final MapStateDescriptorString, String ruleStateDescriptor new MapStateDescriptor(RulesBroadcastState,BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);KeyedBroadcastOperatorTestHarnessString, String, String, String harness ProcessFunctionTestHarnesses.forKeyedBroadcastProcessFunction(function, x - x,TypeInformation.of(String.class), ruleStateDescriptor);harness.processBroadcastElement(0, 1);harness.processBroadcastElement(000, 2);harness.processElement(1, 10);// 判断键值分区状态(注意这里最好就只是某个key下面也就是分组key直接设置为x-固定常数值即可)ValueStateString valueState function.getRuntimeContext().getState(valueStateDescriptor);Assert.assertEquals(valueState.value(), 1);// 判断广播状态BroadcastStateString, String broadcastState harness.getBroadcastState(ruleStateDescriptor);Assert.assertTrue(broadcastState.contains(0));Assert.assertTrue(broadcastState.contains(000));// 判断输出的列表Assert.assertEquals(harness.extractOutputValues(), Arrays.asList(0, 000, 1));}关键代码 1.获取键值分区状态
ValueStateString valueState function.getRuntimeContext().getState(valueStateDescriptor);2.获取广播状态:
BroadcastStateString, String broadcastState harness.getBroadcastState(ruleStateDescriptor);3.工具类
public class ProcessFunctionTestHarnesses {public ProcessFunctionTestHarnesses() {}public static IN, OUT OneInputStreamOperatorTestHarnessIN, OUT forProcessFunction(ProcessFunctionIN, OUT function) throws Exception {OneInputStreamOperatorTestHarnessIN, OUT testHarness new OneInputStreamOperatorTestHarness(new ProcessOperator((ProcessFunction)Preconditions.checkNotNull(function)), 1, 1, 0);testHarness.setup();testHarness.open();return testHarness;}public static K, IN, OUT KeyedOneInputStreamOperatorTestHarnessK, IN, OUT forKeyedProcessFunction(KeyedProcessFunctionK, IN, OUT function, KeySelectorIN, K keySelector, TypeInformationK keyType) throws Exception {KeyedOneInputStreamOperatorTestHarnessK, IN, OUT testHarness new KeyedOneInputStreamOperatorTestHarness(new KeyedProcessOperator((KeyedProcessFunction)Preconditions.checkNotNull(function)), keySelector, keyType, 1, 1, 0);testHarness.open();return testHarness;}public static IN1, IN2, OUT TwoInputStreamOperatorTestHarnessIN1, IN2, OUT forCoProcessFunction(CoProcessFunctionIN1, IN2, OUT function) throws Exception {TwoInputStreamOperatorTestHarnessIN1, IN2, OUT testHarness new TwoInputStreamOperatorTestHarness(new CoProcessOperator((CoProcessFunction)Preconditions.checkNotNull(function)), 1, 1, 0);testHarness.open();return testHarness;}public static K, IN1, IN2, OUT KeyedTwoInputStreamOperatorTestHarnessK, IN1, IN2, OUT forKeyedCoProcessFunction(KeyedCoProcessFunctionK, IN1, IN2, OUT function, KeySelectorIN1, K keySelector1, KeySelectorIN2, K keySelector2, TypeInformationK keyType) throws Exception {KeyedTwoInputStreamOperatorTestHarnessK, IN1, IN2, OUT testHarness new KeyedTwoInputStreamOperatorTestHarness(new KeyedCoProcessOperator((KeyedCoProcessFunction)Preconditions.checkNotNull(function)), keySelector1, keySelector2, keyType, 1, 1, 0);testHarness.open();return testHarness;}public static IN1, IN2, OUT BroadcastOperatorTestHarnessIN1, IN2, OUT forBroadcastProcessFunction(BroadcastProcessFunctionIN1, IN2, OUT function, MapStateDescriptor?, ?... descriptors) throws Exception {BroadcastOperatorTestHarnessIN1, IN2, OUT testHarness new BroadcastOperatorTestHarness(new CoBroadcastWithNonKeyedOperator((BroadcastProcessFunction)Preconditions.checkNotNull(function), Arrays.asList(descriptors)), 1, 1, 0);testHarness.open();return testHarness;}public static K, IN1, IN2, OUT KeyedBroadcastOperatorTestHarnessK, IN1, IN2, OUT forKeyedBroadcastProcessFunction(KeyedBroadcastProcessFunctionK, IN1, IN2, OUT function, KeySelectorIN1, K keySelector, TypeInformationK keyType, MapStateDescriptor?, ?... descriptors) throws Exception {KeyedBroadcastOperatorTestHarnessK, IN1, IN2, OUT testHarness new KeyedBroadcastOperatorTestHarness(new CoBroadcastWithKeyedOperator((KeyedBroadcastProcessFunction)Preconditions.checkNotNull(function), Arrays.asList(descriptors)), keySelector, keyType, 1, 1, 0);testHarness.open();return testHarness;}
}