门户网站怎么做,中山网站建设seo优化营销制作设计,宁波网站建设公司网络推广,做鞋子的招聘网站有哪些说明
这应该是进一步的完善ADBS的工作模式。
之所以做A系列的架构工具#xff0c;就是为了可以实现大型的数据处理、存储。从应用上说#xff0c;是为了提高效率#xff0c;并达到超高的效果。
为了达到这个目的#xff0c;就必须从数据架构上、任务调度上、逻辑架构上作…说明
这应该是进一步的完善ADBS的工作模式。
之所以做A系列的架构工具就是为了可以实现大型的数据处理、存储。从应用上说是为了提高效率并达到超高的效果。
为了达到这个目的就必须从数据架构上、任务调度上、逻辑架构上作出好的设计并将之实现。逻辑架构主要对应的就是Core的设计目前初步的实现了ETL模型的还没有去实现但有了Core和ETL的经验那么模型只是另一种形态和时间问题。
但无论如何数据架构一定是所有应用的基础所以第一步也就是实现了数据架构。ADBS是基于Mongo和Redis搭建的一套可适应高并发并支持多Worker并行执行的数据库流转体系目前看来效果是很理想的单核日吞吐可以达到3000万条数据以上。
因为单步的ADBS已经包含了包括流转、监控、分发在内的一系列程序(sniffer, io型)单个ADBS已经具备了很强的独立服务能力而如果要进行定制化修改会略显麻烦。 所以从应用于结构可靠性的角度上考虑我倾向于使用「简单结构多层迭代」。这也是从计算机发展史里得到的教训简单的结构迭代/叠加才可能实现真正复杂的功能。
本次的内容就是在进行ADBS之间连接时sniffer的动作。
内容
1 Sniffer的取数场景
广义上来说ADBS里除了Worker其他都是Sniffer。不过这里特指进行取数衔接的程序部分。
我大致想了一下Sniffer可能有几个取数场景
1 从数据库取数。这在之前的项目服务类场景下常见需要我向数据库发起Range或者Set查询来获取要处理的数据。2 文件取数。例如这次我会手动下载510300的离线文件然后由Sniffer驱动取到第一个ADBS3 ADBS取数。也就是本次讨论的内容从ADBS取数。本质上也是从数据库取数但是由于ADBS存在一些规范所以取数的模式可以比较固定。另外就是未来ADBS之间衔接必然是不可少的一部分所以特殊独立出来。
2 取数模式
某个step_out的数据如下 step_out提供了默认的任务通道_ch001在sniffer取数时需要根据这个通道的状态进行查询、ACK。
一般情况下Sniffer取数后要立即ACK状态1这样避免其他Sniffer重复的获取数据在最后一步的时候ACK2或者3表示任务的完成状态。有时候数据不满足条件这样会导致计算失败。此时会有再次巡查Sniffer检查到超时会将任务重启初始化。当然超时重试也有次数限制。
初次请求
1 请求数据【是否为空】【是否满足可用条件】2 * 回应【ACK】【如果是数据库取数】3 根据规则判断下一步可行性【队列是否溢出】4 执行具体操作 【增删改】5 进行回应【ACK】【异常上报】
再次巡查
1 检查数据是否认领超时2 将数据的通道字段翻回0
在这里加入一个限定只有一个Sniffer向Step Out(Mongo)发起取数。
加入这个限定后请求过程会得到简化但是这样合理吗
在ADBS中任务的分发是通过Redis的Stream完成的天然的分发方式。所以Sniffer的这个变化不影响并发处理。
并且对于IO来说单个Sniffer可以吞吐的速度已经足够快了。每秒1万条就已经超过单核ADBS一天吞吐量的好几倍了。
所以结论是可行。
加入限定后流程变得简单 1 请求数据 执行两个判定数据是否为空、数据是否满足特定要求(基础要求) 2 判断目标队列是否会溢出 3 根据2的结果决定是否执行操作 4 根据3的结果决定是否ACK
这样就不用考虑并行时的抢占也不必考虑考虑巡检重置超时的问题。
具体的做法其实可以参考StreamsIO.M2S的方法当时只是考虑在本ADBS中将Mongo数据拉到工作队列并记录日志。现在的差别是读取数据和日志不是同一个ADBS。
实例
这个Sniffer运行在MyQuantBaseStep2Signals向MyQuantBase.step1_mongo_out发起取数。
1 Sniffer实际运行的ADBS是MyQuantBaseStep2Signals。2 Sniffer请求的源是MyQuantBase.step1_mongo_out3 Sniffer记录的log是在MyQuantBaseStep2Signals4 Sniffer的目标是 MyQuantBaseStep2Signals.
需要的一些参数
1 redis服务地址2 超时设置3 批次取数量4 源mongo的tier1和tier25 当前mongo记录日志的tier1和tier26 当前所使用的通道7 目标队列的最大长度
代码
from funcs_apifunc_database_model1_6810f9d37e89e5e1f33e1b8f4defa22e import *from configs_base import redis_agent_host,project_name,cur_w
from configs_base import color_print,step1_stream_in# 判断队列是否可以插入
def is_q_available(stream_name, maxlen 100000, new_task_len 10000, redis_agent None,connection_hash None ):cur_redis_agent redis_agentcur_len_resp req.post(cur_redis_agent len_of_queue/,json {stream_name:stream_name,connection_hash:connection_hash}).json()if cur_len_resp[status]:cur_len cur_len_resp[data]if cur_len new_task_len maxlen:return False else:return True else:print(Connection Error)return False # 基于并发方法向数据库存数【队列Write相关-写入消息】- 其实是使用pipeline - 最好单次一万左右
def parrallel_write_msg(stream_name, data_listofdict None ,maxlen None, time_out None,redis_agent None,connection_hash None,is_return_msg_id_listFalse):cur_redis_agent redis_agentcur_maxlen maxlen or 100000# 默认十秒超时time_out time_out or 30print( 并发写Stream)tick11 time.time()resp_dict req.post(cur_redis_agent batch_add_msg/,json {connection_hash:connection_hash,stream_name:stream_name,msg_dict_list:data_listofdict,maxlen:cur_maxlen,is_return_msg_id_list:is_return_msg_id_list},timeouttime_out).json()tick13 time.time()print(写入任务数据 {:.2f}.format(tick13 -tick11))return resp_dict# 回应
def ack_mongo(w None,tier1 None, tier2 None, key_list None,keyname None, channel_name None, channel_val None ):cur_w w or self.w var_list [tier1, tier2, key_list, keyname, channel_name, channel_val]assert all(var_list), ,.join(var_list)参数不可为空filter_list [{keyname:{$in:key_list}}]attr_list [{channel_name:channel_val}]inc_list [{channel_name _cnt: 1}]return cur_w.update_with_inc(tier1 tier1, tier2 tier2, filter_list filter_list, attr_list attr_list, inc_list inc_list)# Modify 2023.01.10
# 【基础定义区-常变】
cur_machine get_machine_name()
print(Current Machine, cur_machine)redis_agent_host http://172.17.0.1:24021/
redis_connection_hash None# 这个sniffer盯的是上一个ADBS的输出
# source
source_server m7.24065
source_tier1 MyQuantBase
source_tier2 step1_mongo_out
gs_id rec_idcurrent_tier1 project_namemarket SH
code 510300
start_slot 26299291# 这个可以自由定义这里我用了24000最大周期 之前有一部分误写入的部分
burnt_slots 20000
batch_num 10000
# 目标队列允许的最大长度
target_q_max_len 100000
target_q_name %s.%s % (current_tier1, step1_stream_in)
sniffer_name sniffer01_query_step1_result
keyname gs_id
channel_name _ch001
custom_filter_list [{market:market,code:code}]
default_filter_list [{_is_enable:1, channel_name:0}]
# 数据连接操作不得超过30秒
db_connect_ttl 30try:source_w from_pickle(source_w_ source_tier1)color_print(【Loading source_w】from pickle)
except:w WMongo(w)source_w w.TryConnectionOnceAndForever(server_name source_server)to_pickle(source_w, source_w_ source_tier1)# 操作
msg
log_tier1 current_tier1
log_tier2 log_sniffertick1 time.time()# 判断队列是否可以写入
is_target_q_available is_q_available(target_q_name,maxlen target_q_max_len, new_task_len batch_num, redis_agent redis_agent,connection_hash redis_connection_hash)cur_len_resp req.post(redis_agent len_of_queue/,json {stream_name:target_q_name,connection_hash:redis_connection_hash}).json()q_len cur_len_resp[data]
print({} Q has {} Messages .format(target_q_name,q_len))# 如果目标队列满
if not is_target_q_available:msg target q is full {} ,{}.format(q_len, qname)if is_target_q_available:tick100 time.time()color_print( fetching from Mongo )recs source_w.query_recs(tier1 source_tier1, tier2 source_tier2, filter_dict {$and:default_filter_list custom_filter_list}, silentTrue, limits batch_num, sort_tuple_list[(channel_name,1)])print(Spends {:.2f} .format(time.time()-tick100))rec_num len(recs[data])if rec_num:data_listofdict recs[data]tick101 time.time()color_print( Writing To Stream )write_resp parrallel_write_msg(target_q_name, data_listofdict data_listofdict ,maxlen target_q_max_len, time_out db_connect_ttl,redis_agent redis_agent,connection_hash redis_connection_hash, is_return_msg_id_listTrue)print(Spends {:.2f} .format(time.time()-tick101))# 假设全部成功如果有失败的最终会被发现超时successful_keyname list(pd.DataFrame(data_listofdict)[keyname]) # ack - 成功ack_res ack_mongo( w source_w,tier1 source_tier1 , tier2 source_tier2, key_list successful_keyname ,keyname keyname, channel_name channel_name, channel_val 2 )if ack_res[status]:msg ok,{} of {} , {} .format(len(successful_keyname),rec_num, target_q_name)else:msg error,ack mongo {} recs of {} .format(rec_num, target_q_name)else:msg no source data {} .format(target_q_name)tick2 time.time()
duration round(tick2 -tick1,2)
# -- log
log_dict {sniffer: sniffer_name,duration:duration,msg: msg }cur_w.insert_recs(tier1log_tier1, tier2log_tier2, data_listofdict [log_dict])
# 操作 END
source mongo: 目标队列 代码比较长改造成本还可以花费的时间比较少。能够越来越多的基于简单复用肯定是好的我的Web编辑平台搞好后应该可以让这种复用更容易最好再加个问答推荐系统。
改造的部分包括
1 将is_q_available、parrallel_write_msg、ack_mongo从对象里抽出来改造为普通函数2 匹配并校准源和当前日志WMongo连接3 将 M2S的流程从对象里抽出来写在sniffer的程序体内
之后其他的ADBS均可以仿照此例连接。