枣庄高端品牌网站建设案例,百度热搜广告位,今天重大新闻头条新闻,节点网站背景
在flink系统中#xff0c;我们为了补充某个流事件成一个完整的记录#xff0c;经常需要调用外部接口获取一些配置数据#xff0c;流事件结合这些配置数据就可以组合成一条完整的记录#xff0c;然而如果同步调用外部系统接口来实现#xff0c;那么会有很大的性能瓶颈…背景
在flink系统中我们为了补充某个流事件成一个完整的记录经常需要调用外部接口获取一些配置数据流事件结合这些配置数据就可以组合成一条完整的记录然而如果同步调用外部系统接口来实现那么会有很大的性能瓶颈这种情况下我们一般会使用异步函数提高性能本文就来记录下使用异步函数的几个注意事项
异步函数的使用
首先看一下官方的例子
/*** 实现 AsyncFunction 用于发送请求和设置回调。*/
class AsyncDatabaseRequest extends RichAsyncFunctionString, Tuple2String, String {/** 能够利用回调函数并发发送请求的数据库客户端 */private transient DatabaseClient client;Overridepublic void open(Configuration parameters) throws Exception {client new DatabaseClient(host, post, credentials);}Overridepublic void close() throws Exception {client.close();}Overridepublic void asyncInvoke(String key, final ResultFutureTuple2String, String resultFuture) throws Exception {// 发送异步请求接收 future 结果final FutureString result client.query(key);// 设置客户端完成请求后要执行的回调函数// 回调函数只是简单地把结果发给 futureCompletableFuture.supplyAsync(new SupplierString() {Overridepublic String get() {try {return result.get();} catch (InterruptedException | ExecutionException e) {// 显示地处理异常。return null;}}}).thenAccept( (String dbResult) - {resultFuture.complete(Collections.singleton(new Tuple2(key, dbResult)));});}
}// 创建初始 DataStream
DataStreamString stream ...;// 应用异步 I/O 转换操作
DataStreamTuple2String, String resultStream AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100);
注意事项如下 1.在asyncinvoke方法中不能有阻塞的操作比如这里仅仅是使用Future.thenAccept注册一个回调返回后的处理逻辑而不会使用Future.get方法进行阻塞操作 2.AsyncDataStream.orderWait和AsyncDataStream.unorderWait方法都能正确的事件时间也就是说即使是AsyncDataStream.unorderWait,它也能保证记录不会被之后的水位线超越 3.异步函数可以和检查点机制进行集成也就是那些正在等待响应结果的记录会被写入检查点中当故障恢复后可以重新发送请求 4.如果服务端没有提供异步的客户端我们可以用多线程进行模拟只要多线程返回future对象即可 5.使用AsyncDataStream可以限制并发数以及如何进行超时处理等