网站保定网站建设多少钱,学做网站论坛vip账户,杭州 建设网站制作,江苏网站开发建设电话说明
之前尝试用FastAPI来构造规则#xff0c;碰到的问题是由于请求量过大(TPS 1000), 从而导致微服务端口资源耗尽。所以现在的point是:
1 如何使用函数来替代微服务(同时要保留使用微服务的优点)2 进一步抽象并规范规则的执行3 等效合并规则的方法
内容
0 机制讨论…说明
之前尝试用FastAPI来构造规则碰到的问题是由于请求量过大(TPS 1000), 从而导致微服务端口资源耗尽。所以现在的point是:
1 如何使用函数来替代微服务(同时要保留使用微服务的优点)2 进一步抽象并规范规则的执行3 等效合并规则的方法
内容
0 机制讨论
过去在使用tornado作为后端服务的时候是没有碰到端口耗尽的问题的也许是tornado本身采取的是长连接更适合大批量数据请求的后端任务。 FastAPI更适合做短、平的IO类异步需求不可以用于级联TPS大约400-1200的样子。 这次的业务场景是实体匹配我们需要从原文中提取出实体然后完成匹配。
数据样本:
ent_list [基金, 美芯晟, 高新兴, 骏成科技, 证券时报, 深市主板, 创业板, 沪市, 科创板, 计算机,
机械设备, 共有3, 潍柴动力, 乐心医疗, 嘉曼服饰, 敏芯股份, 渝开发, 长虹美菱, 德联集团, 数据宝,
中航西飞, 顺络电子, 基金家数, 华利集团, 杰瑞股份, 邦彦技术, 兴瑞科技, 深天马, 漫步,
金力永磁, 太阳能, 普蕊斯, 德方纳米, 华锐精密, 伊之密, 西子洁能, 陕西华达, 浙江鼎力, 诺瓦星云, 远光]original_text_half_width 昨日基金共对31家公司进行调研扎堆调研美芯晟、高新兴、骏成科技等。证券时报•数据宝统计6月5日共40家公司被机构调研按调研机构类型看基金参与31家公司的调研活动其中10家以上基金扎堆调研公司共6家。美芯晟最受关注参与调研的基金达27家高新兴、骏成科技等分别获18家、15家基金集体调研。基金参与调研的公司中按所属板块统计深市主板公司有13家创业板公司有13家沪市主板公司有1家科创板公司有4家。所属行业来看基金调研的公司共涉及13个行业所属电子行业最多有7家公司上榜计算机、机械设备等紧随其后分别有4家、4家公司上榜。从基金调研公司的A股总市值统计总市值在500亿元以上的共有3家其中总市值超千亿元的有潍柴动力等总市值不足100亿元的有17家分别是乐心医疗、嘉曼服饰、敏芯股份等。市场表现上基金调研股中近5日上涨的有10只涨幅居前的有敏芯股份、高新兴、骏成科技等涨幅为21.46%、19.43%、13.83%下跌的有21只跌幅居前的有渝开发、长虹美菱、德联集团等跌幅为12.31%、9.92%、9.22%。数据宝统计基金参与调研股中近5日资金净流入的有12只中航西飞近5日净流入资金1.53亿元主力资金净流入最多净流入资金较多的还有高新兴、顺络电子等净流入资金分别为8217.28万元、3140.67万元。数据宝6月5日基金调研公司一览代码简称基金家数最新收盘价元近5日涨跌幅%行业688458美芯晟27 35.85 -4.48电子300098高新兴18 3.75 19.43计算机301106骏成科技15 32.42 13.83电子002138顺络电子14 25.27 7.76电子300979华利集团14 67.90 1.19纺织服饰300803指南针13 42.92 -0.60计算机002353杰瑞股份9 34.33 -2.05机械设备301276嘉曼服饰8 22.71 -0.74纺织服饰688132邦彦技术8 18.55 5.64国防军工688286敏芯股份6 44.60 21.46电子000338潍柴动力6 15.74 -2.24汽车002937兴瑞科技5 20.93 1.45电子000514渝开发4 3.49 -12.31房地产000050深天马A4 7.31 -2.40电子002351漫步者3 12.73 -0.08电子300748金力永磁2 14.06 -2.77有色金属000591太阳能2 5.05 -6.31公用事业301257普蕊斯2 40.05 -4.53医药生物000768中航西飞2 25.23 4.69国防军工300769德方纳米2 33.88 -2.98电力设备300562乐心医疗1 8.61 -6.62医药生物688059华锐精密1 54.53 -6.79机械设备002666德联集团1 3.94 -9.22基础化工300415伊之密1 22.34 1.79机械设备002534西子洁能1 10.99 -4.18电力设备301517陕西华达1 61.60 2.56国防军工301362民爆光电1 34.00 -7.34家用电器603338浙江鼎力1 61.86 -4.93机械设备301589诺瓦星云1 219.68 -8.31计算机000521长虹美菱1 8.90 -9.92家用电器002063远光软件1 5.70 -1.55计算机注本文系新闻报道不构成投资建议股市有风险投资需谨慎。在逻辑上我们会按照实际情况设计分级在程序上我们要有一个合并的逻辑。这种逻辑要简单不要offend逻辑。 1 现有的服务
采用“WaterFall”的方法逐步批量的处理并分流数据。
一条规则是如此
# reject
app.post(/r000/)
async def r000(justent:JustEnt):the_ent justent.some_entthe_result RuleResult()try:if judge_existence(the_ent, word_listr0_exe_clude_list):the_result.status rejectelse:the_result.status passreturn the_result.dict()except Exception as e:raise HTTPException(status_code400, detailstr(e))
在发起调用时,采用异步方式每次根据请求的目标先品出参数然后将渠道的结果进行解析。
import time
def waterfall_api_mode(last_fall, rule_name ,reject_list None, get_list None, mappling_list None, raw None , base_url None):next_fall []last_ent_list last_fall pure_rule_url rule_name /if len(last_ent_list):rule_url base_url pure_rule_url# api modetick1 time.time()task_list []for ent in last_ent_list:tem_dict {}tem_dict[task_id] ent tem_dict[url] rule_urlif raw is None :tem_dict[json_params] {some_ent:ent}else:tem_dict[json_params] {some_ent:ent,raw:raw}task_list.append(tem_dict)rule_res asyncio.run(json_player(task_list, concurrent 10))# 解析结果保留passfor tem_res in rule_res:for k,v in tem_res.items():# print(k,v)if v[status] pass:next_fall.append(k)elif v[status] get:if get_list is not None :get_list.append(v[data])if mappling_list is not None :mappling_list.append({ent:k,mapping_ent: v[data]})elif v[status] reject:if reject_list is not None :reject_list.append(k)tick2 time.time()print(takes %.2f %(tick2-tick1))return next_fall在批量调用规则时,采用几乎一样的形式即可这是非常简洁的地方。 # fall of short # r100_1next_fall_short waterfall_api_mode(next_fall_short, r100_1,base_url base_url)# r100next_fall_short waterfall_api_mode(next_fall_short, r100, get_list mr.short_result,mappling_listmr.mapping_list ,base_url base_url)# r102next_fall_short waterfall_api_mode(next_fall_short, r102, get_list mr.short_result,mappling_listmr.mapping_list ,base_url base_url)# r102_1next_fall_short waterfall_api_mode(next_fall_short, r102_1, get_list mr.short_result,mappling_listmr.mapping_list ,base_url base_url)# r103next_fall_short waterfall_api_mode(next_fall_short, r103, raw original_text_half_width,base_url base_url)# r104next_fall_short waterfall_api_mode(next_fall_short, r104,base_url base_url)# r105next_fall_short waterfall_api_mode(next_fall_short, r105,base_url base_url)# r106next_fall_short waterfall_api_mode(next_fall_short, r106,base_url base_url)# r107next_fall_short waterfall_api_mode(next_fall_short, r107,base_url base_url)# r200next_fall_short waterfall_api_mode(next_fall_short, r200,base_url base_url)# r201next_fall_short waterfall_api_mode(next_fall_short, r201,base_url base_url)# r202next_fall_short waterfall_api_mode(next_fall_short, r202,base_url base_url)# r203next_fall_short waterfall_api_mode(next_fall_short, r203,base_url base_url)# r299next_fall_short waterfall_api_mode(next_fall_short, r299, get_list mr.short_result,mappling_listmr.mapping_list,base_url base_url )
觉得还不错需要保持的地方
1 数据规范。使用pydantic这个可以继续保持2 waterfall_api_mode 可以作为waterfall_func_mode, 且这次可以规定输出为4部分get、pass、reject、error3 执行时每条规则除了顺序之外应该还有层次实现BFS。规则分为若干模式例如 is_in , is_not_in, 在每个层之间的同类规则可以合并。
2 设计与改进
诶我突然想到了我的APIFunc。 总体上说这个框架还是比较强大的但是非常僵化所以最终没有走向实际应用。所以我觉得完全可以进行拆解重构。当然里面有一部分问题的解决还是蛮厉害的反正这一会我想不出来。
有几块内容是需要添加上的
1 logging对象灵活的进行记录后续会和logstash结合在一起(ELK)2 错误输出遇到错误时发送到kafka3 shortuuid: 每次处理会生成一个shortuuid用于追溯代表一次会话之内的
修改的部分
1 原来有很多数据的校验部分现在可以用pydantic来控制2 BFS替代逐个的链式3 没有列式方法全部是行式方法4 g变量会存储额外的字典不必完全按照df格式
优化的部分
1 修饰器方法支持按依赖定义规则。例如 on depends of [rule1,rule2], def new rule。
保留的部分
1 reinit_data 重新初始化数据
2.1 原型部分
2.1.1 Logging
import logging
from logging.handlers import RotatingFileHandlerdef get_logger(name, lpath/var/log/, moduledefault.default):logger logging.getLogger(name)# 防止重复添加 handlerif not logger.handlers:fpath lpath name .loghandler RotatingFileHandler(fpath, maxBytes100 * 1024 * 1024, backupCount10)# 设置日志格式为 [时间] - [日志级别] - [模块名称] - 消息formatter logging.Formatter([%(asctime)s] - [%(levelname)s] - [{}] - %(message)s.format(module),datefmt%Y-%m-%d %H:%M:%S)handler.setFormatter(formatter)logger.addHandler(handler)logger.setLevel(logging.INFO)return loggerlogger get_logger(example)# 记录不同级别的日志
logger.info([part.a]系统启动完成)
logger.warning([[part.b]磁盘空间不足剩余空间小于10%)
logger.error(无法连接数据库请检查网络设置)
logger.debug(这是调试信息不会显示在日志中)
logger.critical(系统崩溃立即采取措施)2.1.2 BFS 先回忆一下过去的成果当时的结论是使用networkx作为核心的图计算工具而neo4只是后端的存储backup。可以认为是pandas和mysql的关系。 在使用的时候可以为一个项目设置一个独立的名称这个独立的名称也就是节点的label或者可以认为是节点的“表”。在需要的时候可以整个读取(neo4j disk)在内存中处理(networkx memory)。
这段代码定义了一个很小的图
import networkx as nx
import matplotlib.pyplot as plt# 图的定义
# Create a directed graph
G nx.DiGraph()def hello():print(This is Node Running ...)G.add_node(1)
G.nodes[1][name] MinuteData
G.nodes[1][run] helloG.add_node(2)
G.nodes[2][name] CaptialDataDaily
G.nodes[2][run] helloG.add_node(3)
G.nodes[3][name] MergeData
G.nodes[3][run] helloG.add_edge(1,3)
G.add_edge(2,3)G.add_node(4)
G.nodes[4][name] FeatureHorizonal
G.nodes[4][run] helloG.add_edge(3,4)G.add_node(5)
G.nodes[5][name] ImbalanceSample
G.nodes[5][run] helloG.add_edge(4,5)G.add_node(6)
G.nodes[6][name] FeatureVertical
G.nodes[6][run] helloG.add_edge(5,6)# 图的绘画
# 获取节点标签属性
node_labels nx.get_node_attributes(G, name)
# pos nx.shell_layout(G)
pos nx.spring_layout(G)
nx.draw(G, pos, with_labelsFalse, node_size1000, font_size12, font_colorblack, arrowsTrue)
# 绘制节点标签
_ nx.draw_networkx_labels(G, pos, labelsnode_labels) 这里可以看到节点的依赖关系可以很清楚的展示出来。
然后稍微跳一下
# 输入一个nx图给出BFS层级字典
def BFS(some_G,max_depth 100):layer_dict {}# 初始化节点init_node_list [node for node, in_degree in some_G.in_degree() if in_degree 0]layer_dict[0] init_node_list # 节点的入度字典in_degree_dict dict(some_G.in_degree())all_nodes set(some_G.nodes)travel_nodes set(init_node_list)# 迭代节点for i in range(1,max_depth):last_layer_nodes layer_dict[i-1]layer_dict[i] []for last_node in last_layer_nodes:out_nodes list(some_G.successors(last_node))if len(out_nodes):for out_node in out_nodes:out_node_degree in_degree_dict[out_node]out_node_degree1 out_node_degree-1if out_node_degree1 0:layer_dict[i].append(out_node)travel_nodes.add(out_node)else:in_degree_dict[out_node] out_node_degree1gap_set all_nodes - travel_nodesif len(gap_set) 0:breakreturn layer_dictBFS(G)
{0: [1, 2], 1: [3], 2: [4], 3: [5], 4: [6]}给到一个定义好的图通过BFS可以很快把层级梳理出来。
所以将原来的修饰器改一改将节点的依赖关系在启动修饰器的时候解释。函数可以在修饰器下临时定义也可以引用已经编辑好的。现在已经具备了使用形式化参数(如slice_list_batch)来调用函数了既有本地的包也有微服务。
2.1.3 会话 我们将程序的每一次执行视为一次会话。 将程序的每一次执行视为一次会话是一种有用的抽象可以帮助我们追踪、分析和管理程序的行为。每次执行都可以被认为是一个独立的会话这些会话可以包括一系列输入、处理和输出。以下是将程序执行视为会话时的一些要点
1. 会话的定义
每次程序的执行周期从启动到结束被视为一个独立的会话。会话的范围可以根据程序的复杂度定义可能包括启动、执行逻辑、处理数据、生成结果并最终结束。
2. 会话数据
输入数据用户输入或外部数据源提供的信息。上下文信息会话中的环境或系统状态如用户信息、配置设置、会话 ID。日志记录在每个会话中生成的日志信息帮助监控、调试和跟踪程序执行的过程。输出数据会话完成后生成的结果或操作。
3. 会话标识
每个会话可以使用唯一的标识符例如 UUID、时间戳来区分和追踪。日志和监控系统可以根据这个标识符来收集会话信息。
4. 会话的生命周期
开始程序启动或用户发起的操作。执行程序的核心逻辑运行处理输入并生成中间或最终结果。结束程序完成执行或用户操作结束。程序可以写入日志、清理资源或返回结果。
5. 会话状态
成功程序按预期完成所有操作。失败程序执行中出现错误或异常。中断程序由于外部原因或用户取消而中途停止。
6. 会话管理
可以通过记录每次会话的执行时间、状态、输入和输出数据来分析系统的性能和稳定性。会话管理有助于调试当出现问题时可以回溯某一具体会话、分析汇总和统计会话数据以及优化程序。
7. 会话存储
将会话数据存储到数据库、日志文件或分布式系统中以便后续分析或复盘。
通过这种“会话”概念能够更好地组织和管理程序的执行过程尤其在需要跟踪状态、并发操作、或者执行历史时非常有用。
两个需要增加的点(以前没这么实施)
1 生成uuid,用于生成会话的唯一ID
import uuiddef get_uuid(version4, nameNone, namespaceNone):生成 UUID。参数:- version: UUID 版本 (1, 3, 4, 5)- name: 当使用 UUID3 或 UUID5 时需要提供的名称- namespace: 当使用 UUID3 或 UUID5 时需要提供的命名空间 (uuid.NAMESPACE_DNS, uuid.NAMESPACE_URL 等)返回:- 生成的 UUID 字符串if version 1:# 基于时间生成 UUIDreturn uuid.uuid1()elif version 3:if name is None or namespace is None:raise ValueError(UUID3 需要提供 name 和 namespace 参数)# 基于 MD5 哈希的命名空间 UUIDreturn uuid.uuid3(namespace, name)elif version 4:# 生成随机的 UUIDreturn uuid.uuid4()elif version 5:if name is None or namespace is None:raise ValueError(UUID5 需要提供 name 和 namespace 参数)# 基于 SHA-1 哈希的命名空间 UUIDreturn uuid.uuid5(namespace, name)else:raise ValueError(不支持的 UUID 版本。版本应为 1, 3, 4 或 5)2 会话数据存储 在高性能的场景下里面增加的每一个操作可能都会导致系统的不稳定。但是如果是必要的操作那么也不能省。 我问了下大模型自己也想了想觉得还是用kafka比较合适。
python操作kafka一般使用confluent-kafka在有些环境下安装会有点问题。例如我在ubuntu18.04上安装时爆了一些底层错误类似C之类的依赖在20.04上安装就没有问题。但总归要考虑这种环境问题差异会比较麻烦所以我也做了一个kafka_agent,以API的形式提供kafka的访问。缺点是json序列化的过程要加多一次。
我们来考虑当前场景时并不是对每一个请求都发送会话数据
1 正常的执行(INFO)可以考虑按很低的概率抓取会话数据。2 错误(ERROR): 可以完全抓取但这个类型的比例应该本身就是极低的。3 特定的抓取(DEBUG)可以在请求时用特定的字段区分这类型的会话数据会被抓取。
总之需要发起数据存储的概率非常低总体上可能不到1%所以这些额外的开销应该可以接受。反之如果因为会话数据的存储影响了处理说明
1 程序的水准过低错误率太高。2 确实有必要进行并行一边运行一边监控。
如果是程序问题那么就需要不断优化如果是需要同步进行并行检查那么就设置缓冲队列加分布式处理。
使用kafka agent
假设topic为event_collect ,发送一个消息
import requests as req
from pydantic import BaseModel,field_validator
import pandas as pd
import json
import time
class Producer(BaseModel):servers : str raw_msg_list : list is_json : bool True topic : str propertydef msg_list(self):# change raw - json if self.is_json:tick1 time.time()the_list pd.Series(self.raw_msg_list).apply(json.dumps).to_list()print(takes %.2f for json dumps %(time.time() - tick1 ))return the_list else:return self.raw_msg_list回顾一下kafka的搭建,可以使用docker-compose搭建但是我还是比较喜欢直接用docker。
首先需要搭建zookeeper。
docker run -d --restartalways --name zookeeper -e \
ZOOKEEPER_CLIENT_PORT2181 -e \
ZOOKEEPER_TICK_TIME2000 -p 2181:2181 \
registry.cn-hangzhou.aliyuncs.com/andy08008/zookeeper0718:v100然后再搭kafka假设kafka分为内网和外网监听。 创建kafka持久化的路径
mkdir /home/data2T/kafka_data然后创建
WAN_IPXXXX
LAN_IP192.168.0.159
docker run -d --name kafka \-p xxxx:xxxx \-p 9092:9092 \--link zookeeper:zk \-e HOST_IPlocalhost \-e KAFKA_BROKER_ID1 \-e KAFKA_ZOOKEEPER_CONNECTzk:2181 \-e KAFKA_ADVERTISED_LISTENERSINTERNAL://${LAN_IP}:9092,EXTERNAL://${WAN_IP}:xxxx \-e KAFKA_LISTENERSINTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:xxxx \-e KAFKA_LISTENER_SECURITY_PROTOCOL_MAPINTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT \-e KAFKA_LISTENER_NAMEINTERNAL \-e KAFKA_LISTENER_NAMEEXTERNAL \-e KAFKA_INTER_BROKER_LISTENER_NAMEINTERNAL \-e KAFKA_LOG_DIRS/data/kafka-logs \-v /home/data2T/kafka_data:/data/kafka-logs \registry.cn-hangzhou.aliyuncs.com/andy08008/kafka0718:v100这时候就可以使用kafka_agent进行连接了
生产
import requests as req
from pydantic import BaseModel,field_validator
import pandas as pd
import json
import time
class Producer(BaseModel):servers : str raw_msg_list : list is_json : bool True topic : str propertydef msg_list(self):# change raw - json if self.is_json:tick1 time.time()the_list pd.Series(self.raw_msg_list).apply(json.dumps).to_list()print(takes %.2f for json dumps %(time.time() - tick1 ))return the_list else:return self.raw_msg_listmsg_list [{id:i ,value:abc } for i in range(10)]
produces Producer(servers WAN_IP,raw_msg_list msg_list, topicmytest200 )import time
tick1 time.time()
resp req.post(fhttp://{agent_url}/send_msg/,json produces.dict()).json()
tick2 time.time()
print(takes %.2f %(tick2 - tick1) )resp 10
# 外网被占用的情况下耗时比较久
takes 1.44消费
import requests as req
from pydantic import BaseModel,field_validator
import pandas as pd
import json
import time # group.id: 声明不同的group.id 可以重头消费
class InputConsumer(BaseModel):servers : str groupid : str default01is_commit: bool True msg_num : int 3 topic : str is_json : bool True # 外网
the_consumer InputConsumer(servers f{WAN_IP}, msg_num 10, topicmytest202)import time
tick1 time.time()
resp req.post(fhttp://{agent_url}/consume_msg/,json the_consumer.dict()).json()
tick2 time.time()
print(takes %.2f %(tick2 - tick1) )# 内网
lan_the_consumer InputConsumer(servers f{LAN_IP}, msg_num 10, topicmytest202)
import time
tick1 time.time()
resp req.post(fhttp://{agent_url}/consume_msg/,json lan_the_consumer.dict()).json()
tick2 time.time()
print(takes %.2f %(tick2 - tick1) )
我发现在带宽在被占满的情况从公网拉取的消息结果为空但是从内网可以拉取到结果。 原因大致如下对应方法是保证带宽或者在消费端进行修改 当我取消掉模拟耗带宽的操作(rsync大文件),此时无论WAN还是LAN都恢复正常了。 Q1: 使用代理性能是否会有影响 A1: 由于向代理发送接受消息都要经过json序列化效率将会大幅下降。80%以上的开销均为序列化开销。
生产者 10万jsonagent 1秒 外/ 0.78 内仅10万json 0.5秒 消费者 10万条 2.1秒 |1.95 |1.79
但可以看到这样的速度仍然可以大规模使用。 Q2: 如果输错了服务器地址会怎样 A2: 服务将陷入短暂不可用情况。在取消错误的请求后大约5分钟代理重连太久后才会自动取消。
结论保存数据走内网kafka。
其他
Logstash的调试
Logstash 是一个开源的 数据收集引擎通常用于实时数据处理和日志管理。它可以从多种来源收集数据将其过滤、解析并将处理后的数据发送到不同的目标存储系统。Logstash 是 ELK/Elastic StackElasticsearch、Logstash、Kibana的一部分通常与 Elasticsearch 和 Kibana 搭配使用来构建一个完整的日志和事件管理系统。
Logstash 的主要功能 数据收集 Logstash 支持从各种数据源收集数据例如日志文件、数据库、网络、消息队列等。通过插件系统它能够轻松集成到不同的数据源环境中。 数据过滤与解析 Logstash 可以对收集到的数据进行过滤和解析例如使用正则表达式提取字段重新格式化数据或者对数据进行清洗。Logstash 的过滤器插件支持丰富的处理操作比如 Grok 解析、JSON、日期处理、去重、聚合等。 数据输出 Logstash 可以将处理后的数据发送到多个目标系统比如 Elasticsearch用于搜索和分析、文件、数据库、消息队列、监控系统等。
Logstash 主要的架构组件 Inputs输入插件用于指定数据来源如文件、数据库、消息队列等。常见的输入插件包括 file、syslog、kafka、http 等。 Filters过滤插件用于处理、解析和转换数据可以使用 Grok、正则表达式、日期处理等插件来解析复杂的日志格式。 Outputs输出插件用于定义数据的存储位置比如发送到 Elasticsearch、存储到文件、发送到消息队列等。
常见使用场景 日志管理与分析 Logstash 经常与 Elasticsearch 和 Kibana 搭配使用来实现集中式日志管理将来自不同服务的日志集中采集、分析和展示。 实时数据流处理 它还可以用来处理实时数据流例如从 Kafka 或 Redis 获取消息对数据进行实时处理后发送到目标系统。 系统监控与安全分析 在 DevOps 环境中Logstash 用于实时监控应用程序、服务器和网络设备的日志并通过 Kibana 展示给运维人员实现系统健康监控和安全日志分析。
简单工作流程
输入从不同的数据源收集数据如文件、数据库、API 等。过滤通过解析、格式化和过滤等操作对数据进行处理。输出将处理后的数据发送到指定目标如 Elasticsearch、Kafka、文件等。
示例
下面是一个简单的 Logstash 配置它从一个日志文件中收集数据解析后发送到 Elasticsearch
input {file {path /var/log/example.logstart_position beginning}
}filter {grok {match { message %{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} %{GREEDYDATA:message} }}date {match [timestamp, ISO8601]}
}output {elasticsearch {hosts [localhost:9200]index logs-%{YYYY.MM.dd}}
}这个配置会将日志文件中的数据解析为 JSON 格式并按日期创建 Elasticsearch 索引。
Logstash 通过灵活的输入、过滤、输出插件使它成为处理异构数据的强大工具。
日志
这段日志输出来自 Logstash显示了一个日志事件的详细信息。下面是每个字段的解释
9月 15 19:09:07 m7 logstash[23910]: {
9月 15 19:09:07 m7 logstash[23910]: version 1,
9月 15 19:09:07 m7 logstash[23910]: log {
9月 15 19:09:07 m7 logstash[23910]: file {
9月 15 19:09:07 m7 logstash[23910]: path /var/log/example.log
9月 15 19:09:07 m7 logstash[23910]: }
9月 15 19:09:07 m7 logstash[23910]: },
9月 15 19:09:07 m7 logstash[23910]: module part.b,
9月 15 19:09:07 m7 logstash[23910]: log_message 无法连接数据库请检查网络设置,
9月 15 19:09:07 m7 logstash[23910]: timestamp 2024-09-15 19:08:15,
9月 15 19:09:07 m7 logstash[23910]: message [2024-09-15 19:08:15] - [ERROR] - [part.b] - 无法连接数据库请检查网络设置,
9月 15 19:09:07 m7 logstash[23910]: level ERROR,
9月 15 19:09:07 m7 logstash[23910]: timestamp 2024-09-15T11:09:06.865477838Z,
9月 15 19:09:07 m7 logstash[23910]: event {
9月 15 19:09:07 m7 logstash[23910]: original [2024-09-15 19:08:15] - [ERROR] - [part.b] - 无法连接数据库请检查网络设置
9月 15 19:09:07 m7 logstash[23910]: },
9月 15 19:09:07 m7 logstash[23910]: host {
9月 15 19:09:07 m7 logstash[23910]: name m7
9月 15 19:09:07 m7 logstash[23910]: }
9月 15 19:09:07 m7 logstash[23910]: }详细解释 version: 指示事件的版本通常为“1”表示使用的事件格式的版本。 log: 包含日志文件的信息。 file: 具体的日志文件信息。 path: 日志文件的完整路径即日志数据的来源。 module: 动态模块名称即日志消息中包含的模块标识。这个值是你在日志记录中自定义的比如在你的代码中你设置为 part.b。 log_message: 从原始消息中提取出的主要日志内容不包含时间戳和日志级别。 timestamp: 日志事件的时间戳表示事件发生的实际时间。 message: 日志的原始格式化消息包含时间戳、日志级别、模块名称和日志内容。 level: 日志的级别例如“ERROR”、“INFO”等。表示事件的严重性。 timestamp: Logstash 处理事件的时间戳通常是 Logstash 解析日志并将其写入 Elasticsearch 的时间。 event: 包含原始日志消息的完整文本通常用于保持日志的原始格式。 host: 提供了有关 Logstash 运行的主机的信息。 name: 主机名显示 Logstash 实例所在的机器名。
这个日志条目展示了从日志文件中提取的数据以及 Logstash 对其进行解析和处理后的结构化数据。
要特别注意日志偏移的设置这个相当于是logstash的断点续传。
在实际生产环境中sincedb_path 选项是用于 Logstash 跟踪文件读取进度的机制默认情况下它不会设置为 /dev/null。下面解释一下它的常用场景和配置方式
1. sincedb 是什么
sincedb 文件用于 Logstash 记录输入插件如 file读取文件的当前位置。每次 Logstash 读取文件时它会更新 sincedb 文件以便在 Logstash 重启或系统重启时能够从上次停止的地方继续读取而不是从头开始。位置默认情况下sincedb 文件存储在用户的主目录下例如 Linux: ~/.sincedb_*Windows: C:\Users\Username\.sincedb_*
每个 sincedb 文件会跟踪一个特定日志文件的 inode 信息及读取进度。
2. 实际场景下 sincedb_path 的使用 正常生产环境 典型配置你通常会为 sincedb_path 指定一个具体的文件路径确保 Logstash 在重启时能够继续处理文件。例如 sincedb_path /var/lib/logstash/sincedb在这个例子中sincedb 文件会存储在 /var/lib/logstash/ 目录下确保 Logstash 有足够的权限去读取和写入该文件。 文件路径管理如果你有多个不同的日志文件输入可以为每个文件输入指定不同的 sincedb_path以避免冲突。例如 input {file {path /var/log/app1.logsincedb_path /var/lib/logstash/sincedb_app1}file {path /var/log/app2.logsincedb_path /var/lib/logstash/sincedb_app2}
}3. sincedb_path /dev/null 在实际中的用途
在某些特殊场景下你可能会临时使用 /dev/null但不建议在生产环境中使用。
使用 /dev/null 的情况 调试/开发阶段 当你在开发或调试 Logstash 配置时你可能希望每次启动 Logstash 时都从头读取日志文件。在这种情况下你可以临时将 sincedb_path 设置为 /dev/null这样 Logstash 每次都会忽略之前的进度从文件的开头开始读取。 sincedb_path /dev/null短期任务 对于一次性读取文件的任务或临时性的日志分析你可能不需要记录进度。在这种情况下使用 /dev/null 也是合理的。
不建议在生产环境中使用的原因
文件读取进度丢失如果你将 sincedb_path 设置为 /dev/nullLogstash 无法保存文件读取进度。在生产环境中如果 Logstash 服务重启或系统出现问题你将丢失已处理文件的位置信息Logstash 会从头开始读取整个日志文件这可能会导致重复处理日志。
4. 总结sincedb_path 在生产环境的最佳实践 指定合适的路径在生产环境中建议明确指定 sincedb_path 到一个持久存储的路径通常位于 /var/lib/logstash 之类的目录确保 Logstash 能记录文件读取进度。 例如 sincedb_path /var/lib/logstash/sincedb_example使用 /dev/null 慎重仅在调试、开发或一次性任务中使用 /dev/null避免在生产环境中使用以防日志文件重新读取时产生问题。 整体上我们写好了程序当逻辑较为复杂时或者我们将之作为服务进行长期运行时容易失联。我们并不知道程序/服务出了什么问题进行定位时需要切到非常细的操作经常达到让人望而却步的程度。 比较可行的方法是程序将日志追加到文件然后由其他程序(如logstash)进行读取解析转存到es中供监控和后续分析。 追加到日志是代价比较低且不会犯错的操作。通过rotate,我们也避免了磁盘满的风险。
日志分为5个级别我们关注其中四个(忽略Debug) 例如Info 可以是类心跳的信息确保程序正常运行无论是Idle还是处理数据。给到的FeedBack是在常态运行。另一个点就是提前准备好可以测试其功能的样本数据隔一段时间调一次确保无论是空载还是满载都能得到反馈。
Warning 是一些预警例如磁盘空间不足、内存空间不足、网络带宽不足等。这些随时可能会导致程序崩溃、挂起。
Error 是一些错误例如数据库连接中断部分数据逻辑处理错误。
Critical 是致命性错误例如显卡出问题了模型无法载入。
日志文件 /var/log/example.log
[2024-09-15 19:08:15] - [INFO] - [part.a] - 系统启动完成
[2024-09-15 19:08:15] - [WARNING] - [part.b] - 磁盘空间不足剩余空间小于10%
[2024-09-15 19:08:15] - [ERROR] - [part.b] - 无法连接数据库请检查网络设置
[2024-09-15 19:08:15] - [CRITICAL] - [part.b] - 系统崩溃立即采取措施写入vim /etc/logstash/conf.d/debug_logstash.conf, grok语句解析4个变量
1 时间戳 timestamp2 日志等级level3 模块名称module4 消息主体log_message input {file {path /var/log/example.logstart_position beginningsincedb_path /dev/null}
}filter {grok {match {message \[%{TIMESTAMP_ISO8601:timestamp}\] - \[%{LOGLEVEL:level}\] - \[%{DATA:module}\] - %{GREEDYDATA:log_message}}}
}output {stdout { codec rubydebug }
}
校验语句
/usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/debug_logstash.conf --config.test_and_exit重启服务
systemctl restart logstash观察结果
journalctl -u logstash -f总体上感觉grok解析还是有点麻烦尽量简单点好了倒是JSON解析可能更适合我但是显然效率会稍微低一点。 篇幅太长了再写一篇续吧。
本篇:
1 介绍了问题的由来现状(api)。2 完成了设计思路以及一些对应组件的validate 1 logging : python的logging 和 logstash配合2 graph: 使用 networkx 来进行BFS计算规则之间可以按照nx的方式定义依赖3 uuid: 使用 uuid 来表示会话4 kafka: 回顾kafka的搭建使用kafka agent进行数据提交保存
下篇
1 重构新的规则对象 核心功能允许灵活的定义规则(graph) 2 将本次的日志、会话(uuid及保存)实现3 梳理未来规则分类与合并的思路