甘肃做网站多少钱,怎么做网络推广招聘,网站无法访问的原因,做任务的网站有哪些1.概述
Python API中文文档 本文介绍在阿里云实时计算flink中使用python作业#xff0c;把oss中的数据同步数据到阿里云selectdb的过程。python简单的语法特性更适合flink作业的开发#xff1b; 先说结论: 在实际开发中遇到了很多问题#xff0c;导致python作业基本基本无法…1.概述
Python API中文文档 本文介绍在阿里云实时计算flink中使用python作业把oss中的数据同步数据到阿里云selectdb的过程。python简单的语法特性更适合flink作业的开发 先说结论: 在实际开发中遇到了很多问题导致python作业基本基本无法运行。最后放弃了
python作业中的标量函数的错误没有日志永远是报这个错误:ExceptionInChainedOperatorException: Could not forward element to next operator定位不到具体问题python作业中的用户定义的标量函数基本无法运行。本地测试没有问题的函数提交到flink中就报错。怀疑是环境中没有flink-python.jar自己上传此jar和flink中的包不兼容(阿里云flink和开源版本flink有些jar包不一样)如果各位遇到些问题并且有解决方案麻烦也告知我非常感谢
2.目标
把阿里云sls日志中的数据准实时同步到云服务selectdb
源表flink结果表阿里云sls实时计算flink云服务selectdb
3.步骤
3.1.搭建环境
#**创建虚拟环境essa-flinkpyhton版本为3.11.9
conda create -n essa-flink python3.11.9#**安装apache-flink-1.20版本。安装的依赖比较大指定国内的pip源
pip install apache-flink1.20.0 -i https://pypi.tuna.tsinghua.edu.cn/simple3.2.创建作业
作业代码本身很简单逐行读取sls的日志进行转换后保存到selectdb中。转换函数为do_active_log在本地测试过程中遇到了第一个问题后很轻松愉快就通过了。部署在flink中出现了其它问题
首先是阿里云提供sls连接器(ververica-connector-sls-1.17-vvr-8.0.8.jar)不可用报错缺少com/alibaba/ververica/connectors/sls/source/SLSRecordReader。查看源码确实没有定义此类。提工单后建设使用低版本解决然后报错缺少flink-python不能执行python函数。于是把flink-python上传并在作业中引用依赖最后报错ExceptionInChainedOperatorException: Could not forward element to next无法执行。把作业中函数调用do_active_log删除后正常。提工单后还是没有解决。最后放弃改用jar作业
def do_active_log(row: Row) - Row:用户登录日志处理logging.info(执行do_active_log函数...)params json.loads(row[2])occurred datetime.fromtimestamp(float(row[1]))user_id params[userId]platform params[platform]last_active_time occurredcreate_time occurredid occurred.strftime(%Y%m%d) str(user_id)return Row(str(id), int(user_id), platform, last_active_time, create_time)def create_active_log_sink_table(table_env: StreamTableEnvironment, sink_table: str):创建用户登录日志结果表sql create temporary table {}(id string,user_id int,platform string,last_active_time timestamp,create_time timestamp,primary key(id) not enforced) with (connector doris,fenodes {},table.identifier {},username {},password {},sink.properties.format json).format(sink_table, sink_config[fenodes], sink_config[table.identifier], sink_config[username], sink_config[password])table_env.execute_sql(sql)def get_soruce_datastream(table_env: StreamTableEnvironment):创建datastreamtimes {start_time: , stop_time: }sql create temporary table essa_ubc(ip string,time string,content string,__topic__ string metadata virtual,__source__ string metadata virtual,__timestamp__ string metadata virtual) with (connector sls,endpoint {},accessId {},accessKey {},project {},logstore essa-ubc,startTime {},stopTime {},exitAfterFinish true).format(source_config[sls_endpoint], source_config[access_id], source_config[access_secret],source_config[sls_project], times[start_time], times[stop_time])table_env.execute_sql(sql)source_table table_env.from_path(essa_ubc)return table_env.to_append_stream(source_table, Types.ROW([Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING(),Types.STRING(), Types.STRING()]))if __name__ __main__:env StreamExecutionEnvironment.get_execution_environment()t_env StreamTableEnvironment.create(stream_execution_environmentenv)#**加载依赖的jar包t_env.get_config().set(pipeline.jars, 依赖包.jar)#**创建sls源ds get_soruce_datastream(t_env)#**用户登录日志处理#**读取sls日志数据然后使用自定义标量函数处理数据ds ds.filter(lambda d: d[3] activeLog).map(do_active_log, Types.ROW([Types.STRING(), Types.INT(), Types.STRING(),Types.SQL_TIMESTAMP(), Types.SQL_TIMESTAMP()]))table t_env.from_data_stream(ds)active_log_sink_table user_active_logcreate_active_log_sink_table(t_env, active_log_sink_table)table.execute_insert(active_log_sink_table).wait()